This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch iqv2-framework
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/iqv2-framework by this push:
new 1009b84 consolidate Position
1009b84 is described below
commit 1009b8447fb40d2f373f1ebe88b638a117852351
Author: John Roesler <[email protected]>
AuthorDate: Tue Nov 30 13:51:55 2021 -0600
consolidate Position
---
.../state/internals/InMemoryKeyValueStore.java | 3 +-
.../kafka/streams/state/internals/Position.java | 96 ----------------------
.../streams/state/internals/RocksDBStore.java | 3 +-
.../streams/integration/IQv2IntegrationTest.java | 7 ++
.../state/internals/InMemoryKeyValueStoreTest.java | 3 +-
.../streams/state/internals/PositionTest.java | 66 ---------------
.../streams/state/internals/RocksDBStoreTest.java | 3 +-
7 files changed, 15 insertions(+), 166 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 7b190c1..147b456 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
@@ -135,7 +136,7 @@ public class InMemoryKeyValueStore implements
KeyValueStore<Bytes, byte[]> {
if (context != null && context.recordMetadata().isPresent()) {
final RecordMetadata meta = context.recordMetadata().get();
- position = position.update(meta.topic(), meta.partition(),
meta.offset());
+ position = position.withComponent(meta.topic(), meta.partition(),
meta.offset());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java
deleted file mode 100644
index dfb1c61..0000000
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-
-public class Position {
- private final ConcurrentMap<String, ConcurrentMap<Integer, AtomicLong>>
position;
-
- public static Position emptyPosition() {
- final HashMap<String, Map<Integer, Long>> pos = new HashMap<>();
- return new Position(pos);
- }
-
- public static Position fromMap(final Map<String, Map<Integer, Long>> map) {
- return new Position(map);
- }
-
- private Position(final Map<String, Map<Integer, Long>> other) {
- this.position = new ConcurrentHashMap<>();
- merge(other, (t, e) -> update(t, e.getKey(),
e.getValue().longValue()));
- }
-
- public Position update(final String topic, final int partition, final long
offset) {
- position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
- final ConcurrentMap<Integer, AtomicLong> topicMap =
position.get(topic);
- topicMap.computeIfAbsent(partition, k -> new AtomicLong(0));
- topicMap.get(partition).getAndAccumulate(offset, Math::max);
- return this;
- }
-
- public void merge(final Position other) {
- merge(other.position, (a, b) -> update(a, b.getKey(),
b.getValue().longValue()));
- }
-
- @Override
- public String toString() {
- return "Position{" +
- "position=" + position +
- '}';
- }
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- final Position other = (Position) o;
- final HashMap<String, HashMap<Integer, Long>> position1 = new
HashMap<>();
- merge(position, (t, e) -> position1.computeIfAbsent(t, k -> new
HashMap<Integer, Long>()).put(e.getKey(), e.getValue().longValue()));
- final HashMap<String, HashMap<Integer, Long>> position2 = new
HashMap<>();
- merge(other.position, (t, e) -> position2.computeIfAbsent(t, k -> new
HashMap<Integer, Long>()).put(e.getKey(), e.getValue().longValue()));
-
- return Objects.equals(position1, position2);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(position);
- }
-
- private void merge(final Map<String, ? extends Map<Integer, ? extends
Number>> other, final BiConsumer<String, Entry<Integer, ? extends Number>>
func) {
- for (final Entry<String, ? extends Map<Integer, ? extends Number>>
entry : other.entrySet()) {
- final String topic = entry.getKey();
- final Map<Integer, ? extends Number> inputMap = entry.getValue();
- for (final Entry<Integer, ? extends Number> topicEntry :
inputMap.entrySet()) {
- func.accept(topic, topicEntry);
- }
- }
- }
-}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 117b9bd..2d51472 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
@@ -307,7 +308,7 @@ public class RocksDBStore implements KeyValueStore<Bytes,
byte[]>, BatchWritingS
if (context != null && context.recordMetadata().isPresent()) {
final RecordMetadata meta = context.recordMetadata().get();
- position = position.update(meta.topic(), meta.partition(),
meta.offset());
+ position = position.withComponent(meta.topic(), meta.partition(),
meta.offset());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index 7d33c74..fdfaf93 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
import org.apache.kafka.streams.query.FailureReason;
@@ -336,8 +337,14 @@ public class IQv2IntegrationTest {
return STORE_NAME;
}
+ @Deprecated
@Override
public void init(final ProcessorContext context, final
StateStore root) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void init(final StateStoreContext context,
final StateStore root) {
context.register(root, (key, value) ->
put(Bytes.wrap(key), value));
this.open = true;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index dc162d4..b23131a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
import org.apache.kafka.streams.state.Stores;
@@ -265,7 +266,7 @@ public class InMemoryKeyValueStoreTest extends
AbstractKeyValueStoreTest {
long offset = 0;
for (final KeyValue<Bytes, byte[]> k : entries) {
inMemoryKeyValueStore.put(k.key, k.value);
- expected.update("input", 0, offset);
+ expected.withComponent("input", 0, offset);
offset++;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/PositionTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/PositionTest.java
deleted file mode 100644
index e512609..0000000
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/PositionTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state.internals;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-import java.io.IOException;
-import org.junit.Test;
-
-public class PositionTest {
-
- private final String topic = "topic";
-
- @Test
- public void shouldMatchOnEqual() throws IOException {
- final Position position1 = Position.emptyPosition();
- final Position position2 = Position.emptyPosition();
- position1.update("topic1", 0, 1);
- position2.update("topic1", 0, 1);
-
- position1.update("topic1", 1, 2);
- position2.update("topic1", 1, 2);
-
- position1.update("topic1", 2, 1);
- position2.update("topic1", 2, 1);
-
- position1.update("topic2", 0, 0);
- position2.update("topic2", 0, 0);
-
- assertEquals(position1, position2);
- }
-
- @Test
- public void shouldNotMatchOnUnEqual() throws IOException {
- final Position position1 = Position.emptyPosition();
- final Position position2 = Position.emptyPosition();
- position1.update("topic1", 0, 1);
- position2.update("topic1", 0, 1);
-
- position1.update("topic1", 1, 2);
-
- position1.update("topic1", 2, 1);
- position2.update("topic1", 2, 1);
-
- position1.update("topic2", 0, 0);
- position2.update("topic2", 0, 0);
-
- assertNotEquals(position1, position2);
- }
-}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 864500a..64178c1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -38,6 +38,7 @@ import
org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -407,7 +408,7 @@ public class RocksDBStoreTest extends
AbstractKeyValueStoreTest {
long offset = 0;
for (final KeyValue<Bytes, byte[]> k : entries) {
rocksDBStore.put(k.key, k.value);
- expected.update("input", 0, offset);
+ expected.withComponent("input", 0, offset);
offset++;
}