This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch iqv2-framework
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/iqv2-framework by this push:
     new 1009b84  consolidate Position
1009b84 is described below

commit 1009b8447fb40d2f373f1ebe88b638a117852351
Author: John Roesler <[email protected]>
AuthorDate: Tue Nov 30 13:51:55 2021 -0600

    consolidate Position
---
 .../state/internals/InMemoryKeyValueStore.java     |  3 +-
 .../kafka/streams/state/internals/Position.java    | 96 ----------------------
 .../streams/state/internals/RocksDBStore.java      |  3 +-
 .../streams/integration/IQv2IntegrationTest.java   |  7 ++
 .../state/internals/InMemoryKeyValueStoreTest.java |  3 +-
 .../streams/state/internals/PositionTest.java      | 66 ---------------
 .../streams/state/internals/RocksDBStoreTest.java  |  3 +-
 7 files changed, 15 insertions(+), 166 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 7b190c1..147b456 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
 import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
@@ -135,7 +136,7 @@ public class InMemoryKeyValueStore implements 
KeyValueStore<Bytes, byte[]> {
 
         if (context != null && context.recordMetadata().isPresent()) {
             final RecordMetadata meta = context.recordMetadata().get();
-            position = position.update(meta.topic(), meta.partition(), 
meta.offset());
+            position = position.withComponent(meta.topic(), meta.partition(), 
meta.offset());
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java
deleted file mode 100644
index dfb1c61..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/Position.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state.internals;
-
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.BiConsumer;
-
-public class Position {
-    private final ConcurrentMap<String, ConcurrentMap<Integer, AtomicLong>> 
position;
-
-    public static Position emptyPosition() {
-        final HashMap<String, Map<Integer, Long>> pos = new HashMap<>();
-        return new Position(pos);
-    }
-
-    public static Position fromMap(final Map<String, Map<Integer, Long>> map) {
-        return new Position(map);
-    }
-
-    private Position(final Map<String, Map<Integer, Long>> other) {
-        this.position = new ConcurrentHashMap<>();
-        merge(other, (t, e) -> update(t, e.getKey(), 
e.getValue().longValue()));
-    }
-
-    public Position update(final String topic, final int partition, final long 
offset) {
-        position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
-        final ConcurrentMap<Integer, AtomicLong> topicMap = 
position.get(topic);
-        topicMap.computeIfAbsent(partition, k -> new AtomicLong(0));
-        topicMap.get(partition).getAndAccumulate(offset, Math::max);
-        return this;
-    }
-
-    public void merge(final Position other) {
-        merge(other.position, (a, b) -> update(a, b.getKey(), 
b.getValue().longValue()));
-    }
-
-    @Override
-    public String toString() {
-        return "Position{" +
-                "position=" + position +
-                '}';
-    }
-
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        final Position other = (Position) o;
-        final HashMap<String, HashMap<Integer, Long>> position1 = new 
HashMap<>();
-        merge(position, (t, e) -> position1.computeIfAbsent(t, k -> new 
HashMap<Integer, Long>()).put(e.getKey(), e.getValue().longValue()));
-        final HashMap<String, HashMap<Integer, Long>> position2 = new 
HashMap<>();
-        merge(other.position, (t, e) -> position2.computeIfAbsent(t, k -> new 
HashMap<Integer, Long>()).put(e.getKey(), e.getValue().longValue()));
-
-        return Objects.equals(position1, position2);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(position);
-    }
-
-    private void merge(final Map<String, ? extends Map<Integer, ? extends 
Number>> other, final BiConsumer<String, Entry<Integer, ? extends Number>> 
func) {
-        for (final Entry<String, ? extends Map<Integer, ? extends Number>> 
entry : other.entrySet()) {
-            final String topic = entry.getKey();
-            final Map<Integer, ? extends Number> inputMap = entry.getValue();
-            for (final Entry<Integer, ? extends Number> topicEntry : 
inputMap.entrySet()) {
-                func.accept(topic, topicEntry);
-            }
-        }
-    }
-}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 117b9bd..2d51472 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryResult;
@@ -307,7 +308,7 @@ public class RocksDBStore implements KeyValueStore<Bytes, 
byte[]>, BatchWritingS
 
         if (context != null && context.recordMetadata().isPresent()) {
             final RecordMetadata meta = context.recordMetadata().get();
-            position = position.update(meta.topic(), meta.partition(), 
meta.offset());
+            position = position.withComponent(meta.topic(), meta.partition(), 
meta.offset());
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index 7d33c74..fdfaf93 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -39,6 +39,7 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamThread.State;
 import org.apache.kafka.streams.query.FailureReason;
@@ -336,8 +337,14 @@ public class IQv2IntegrationTest {
                             return STORE_NAME;
                         }
 
+                        @Deprecated
                         @Override
                         public void init(final ProcessorContext context, final 
StateStore root) {
+                            throw new UnsupportedOperationException();
+                        }
+
+                        @Override
+                        public void init(final StateStoreContext context, 
final StateStore root) {
                             context.register(root, (key, value) -> 
put(Bytes.wrap(key), value));
                             this.open = true;
                         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index dc162d4..b23131a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
 import org.apache.kafka.streams.state.Stores;
@@ -265,7 +266,7 @@ public class InMemoryKeyValueStoreTest extends 
AbstractKeyValueStoreTest {
         long offset = 0;
         for (final KeyValue<Bytes, byte[]> k : entries) {
             inMemoryKeyValueStore.put(k.key, k.value);
-            expected.update("input", 0, offset);
+            expected.withComponent("input", 0, offset);
             offset++;
         }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/PositionTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/PositionTest.java
deleted file mode 100644
index e512609..0000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/PositionTest.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state.internals;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-
-import java.io.IOException;
-import org.junit.Test;
-
-public class PositionTest {
-
-    private final String topic = "topic";
-
-    @Test
-    public void shouldMatchOnEqual() throws IOException {
-        final Position position1 = Position.emptyPosition();
-        final Position position2 = Position.emptyPosition();
-        position1.update("topic1", 0, 1);
-        position2.update("topic1", 0, 1);
-
-        position1.update("topic1", 1, 2);
-        position2.update("topic1", 1, 2);
-
-        position1.update("topic1", 2, 1);
-        position2.update("topic1", 2, 1);
-
-        position1.update("topic2", 0, 0);
-        position2.update("topic2", 0, 0);
-
-        assertEquals(position1, position2);
-    }
-
-    @Test
-    public void shouldNotMatchOnUnEqual() throws IOException {
-        final Position position1 = Position.emptyPosition();
-        final Position position2 = Position.emptyPosition();
-        position1.update("topic1", 0, 1);
-        position2.update("topic1", 0, 1);
-
-        position1.update("topic1", 1, 2);
-
-        position1.update("topic1", 2, 1);
-        position2.update("topic1", 2, 1);
-
-        position1.update("topic2", 0, 0);
-        position2.update("topic2", 0, 0);
-
-        assertNotEquals(position1, position2);
-    }
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index 864500a..64178c1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -38,6 +38,7 @@ import 
org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.StateStoreContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.Position;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.RocksDBConfigSetter;
@@ -407,7 +408,7 @@ public class RocksDBStoreTest extends 
AbstractKeyValueStoreTest {
         long offset = 0;
         for (final KeyValue<Bytes, byte[]> k : entries) {
             rocksDBStore.put(k.key, k.value);
-            expected.update("input", 0, offset);
+            expected.withComponent("input", 0, offset);
             offset++;
         }
 

Reply via email to