This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e3bbe63c1d KAFKA-14491: [9/N] Add versioned bytes store and supplier 
(#13250)
2e3bbe63c1d is described below

commit 2e3bbe63c1d7a5484d0475c57e82011fada9cfa5
Author: Victoria Xia <[email protected]>
AuthorDate: Fri Feb 17 11:06:04 2023 -0800

    KAFKA-14491: [9/N] Add versioned bytes store and supplier (#13250)
    
    As part of introducing versioned key-value stores in KIP-889, we'd like a 
way to represent a versioned key-value store (VersionedKeyValueStore<Bytes, 
byte[]>) as a regular key-value store (KeyValueStore<Bytes, byte[]>) in order 
to be compatible with existing DSL methods for passing key-value stores, e.g., 
StreamsBuilder#table() and KTable methods, which are explicitly typed to accept 
Materialized<K, V, KeyValueStore<Bytes, byte[]>. This way, we do not need to 
introduce new versions of [...]
    
    This PR introduces the new VersionedBytesStore extends KeyValueStore<Bytes, 
byte[]> interface for this purpose, along with the corresponding supplier 
(VersionedBytesStoreSupplier) and implementation 
(RocksDbVersionedKeyValueBytesStoreSupplier). The 
RocksDbVersionedKeyValueBytesStoreSupplier implementation leverages an adapter 
(VersionedKeyValueToBytesStoreAdapter) to assist in converting from 
VersionedKeyValueStore to VersionedBytesStore.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../kafka/streams/state/VersionedBytesStore.java   |  36 ++++
 .../streams/state/VersionedBytesStoreSupplier.java |  46 +++++
 ...RocksDbVersionedKeyValueBytesStoreSupplier.java |  82 +++++++++
 .../VersionedKeyValueToBytesStoreAdapter.java      | 196 +++++++++++++++++++++
 ...sDbVersionedKeyValueBytesStoreSupplierTest.java |  53 ++++++
 5 files changed, 413 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java 
b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java
new file mode 100644
index 00000000000..c0d4ef68904
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Bytes;
+
+/**
+ * A representation of a versioned key-value store as a {@link KeyValueStore} 
of type &lt;Bytes, byte[]&gt;.
+ * See {@link VersionedBytesStoreSupplier} for more.
+ */
+public interface VersionedBytesStore extends KeyValueStore<Bytes, byte[]>, 
TimestampedBytesStore {
+
+    /**
+     * The analog of {@link VersionedKeyValueStore#get(Object, long)}.
+     */
+    byte[] get(Bytes key, long asOfTimestamp);
+
+    /**
+     * The analog of {@link VersionedKeyValueStore#delete(Object, long)}.
+     */
+    byte[] delete(Bytes key, long timestamp);
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStoreSupplier.java
new file mode 100644
index 00000000000..5ba97335afa
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStoreSupplier.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Predicate;
+
+/**
+ * A store supplier that can be used to create one or more versioned key-value 
stores,
+ * specifically, {@link VersionedBytesStore} instances.
+ * <p>
+ * Rather than representing the returned store as a {@link 
VersionedKeyValueStore} of
+ * type &lt;Bytes, byte[]&gt;, this supplier interface represents the returned 
store as a
+ * {@link KeyValueStore} of type &lt;Bytes, byte[]&gt; (via {@link 
VersionedBytesStore}) in order to be compatible with
+ * existing DSL methods for passing key-value stores such as {@link 
StreamsBuilder#table(String, Materialized)}
+ * and {@link KTable#filter(Predicate, Materialized)}. A {@code 
VersionedKeyValueStore<Bytes, byte[]>}
+ * is represented as a {@code KeyValueStore KeyValueStore<Bytes, byte[]>} by 
interpreting the
+ * value bytes as containing record timestamp information in addition to raw 
record values.
+ */
+public interface VersionedBytesStoreSupplier extends 
KeyValueBytesStoreSupplier {
+
+    /**
+     * Returns the history retention (in milliseconds) that stores created 
from this supplier will have.
+     * This value is used to set compaction configs on store changelog topics 
(if relevant).
+     *
+     * @return history retention, i.e., length of time that old record 
versions are available for
+     *         query from a versioned store
+     */
+    long historyRetentionMs();
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplier.java
new file mode 100644
index 00000000000..3efffd3a98b
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+
+public class RocksDbVersionedKeyValueBytesStoreSupplier implements 
VersionedBytesStoreSupplier {
+
+    private final String name;
+    private final long historyRetentionMs;
+    private final long segmentIntervalMs;
+
+    public RocksDbVersionedKeyValueBytesStoreSupplier(final String name,
+                                                      final long 
historyRetentionMs
+    ) {
+        this(name, historyRetentionMs, 
defaultSegmentInterval(historyRetentionMs));
+    }
+
+    public RocksDbVersionedKeyValueBytesStoreSupplier(final String name,
+                                                      final long 
historyRetentionMs,
+                                                      final long 
segmentIntervalMs
+    ) {
+        this.name = name;
+        this.historyRetentionMs = historyRetentionMs;
+        this.segmentIntervalMs = segmentIntervalMs;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public long historyRetentionMs() {
+        return historyRetentionMs;
+    }
+
+    public long segmentIntervalMs() {
+        return segmentIntervalMs;
+    }
+
+    @Override
+    public KeyValueStore<Bytes, byte[]> get() {
+        return new VersionedKeyValueToBytesStoreAdapter(
+            new RocksDBVersionedStore(name, metricsScope(), 
historyRetentionMs, segmentIntervalMs)
+        );
+    }
+
+    @Override
+    public String metricsScope() {
+        return "rocksdb";
+    }
+
+    private static long defaultSegmentInterval(final long historyRetentionMs) {
+        // Selected somewhat arbitrarily. Optimal value depends heavily on 
data distribution
+        if (historyRetentionMs <= 60_000L) {
+            return Math.max(historyRetentionMs / 3, 2_000L);
+        } else if (historyRetentionMs <= 300_000L) {
+            return Math.max(historyRetentionMs / 5, 20_000L);
+        } else if (historyRetentionMs <= 3600_000L) {
+            return Math.max(historyRetentionMs / 12, 60_000L);
+        } else {
+            return Math.max(historyRetentionMs / 24, 300_000L);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
new file mode 100644
index 00000000000..3dbed8e97e7
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Adapts from {@link VersionedKeyValueStore} (user-friendly versioned store 
interface) to
+ * {@link KeyValueStore}. By representing a {@code VersionedKeyValueStore} as a
+ * {@code KeyValueStore}, this allows reuse of existing {@link StreamsBuilder} 
and {@link KTable}
+ * method interfaces which accept {@code Materialized<K, V, 
KeyValueStore<Bytes, byte[]>)}
+ * for versioned key-value stores.
+ */
+public class VersionedKeyValueToBytesStoreAdapter implements 
VersionedBytesStore {
+    private static final Serde<ValueAndTimestamp<byte[]>> 
VALUE_AND_TIMESTAMP_SERDE
+        = new NullableValueAndTimestampSerde<>(new ByteArraySerde());
+    private static final Serializer<ValueAndTimestamp<byte[]>> 
VALUE_AND_TIMESTAMP_SERIALIZER
+        = VALUE_AND_TIMESTAMP_SERDE.serializer();
+    private static final Deserializer<ValueAndTimestamp<byte[]>> 
VALUE_AND_TIMESTAMP_DESERIALIZER
+        = VALUE_AND_TIMESTAMP_SERDE.deserializer();
+
+    final VersionedKeyValueStore<Bytes, byte[]> inner;
+
+    public VersionedKeyValueToBytesStoreAdapter(final 
VersionedKeyValueStore<Bytes, byte[]> inner) {
+        this.inner = Objects.requireNonNull(inner);
+    }
+
+    @Override
+    public void put(final Bytes key, final byte[] rawValueAndTimestamp) {
+        if (rawValueAndTimestamp == null) {
+            throw new IllegalArgumentException("Put to versioned store must 
always include timestamp, including for tombstones.");
+        }
+        final ValueAndTimestamp<byte[]> valueAndTimestamp
+            = VALUE_AND_TIMESTAMP_DESERIALIZER.deserialize(null, 
rawValueAndTimestamp);
+        inner.put(
+            key,
+            valueAndTimestamp.value(),
+            valueAndTimestamp.timestamp()
+        );
+    }
+
+    @Override
+    public byte[] get(final Bytes key) {
+        final VersionedRecord<byte[]> versionedRecord = inner.get(key);
+        return serializeAsBytes(versionedRecord);
+    }
+
+    @Override
+    public byte[] get(final Bytes key, final long asOfTimestamp) {
+        final VersionedRecord<byte[]> versionedRecord = inner.get(key, 
asOfTimestamp);
+        return serializeAsBytes(versionedRecord);
+    }
+
+    @Override
+    public byte[] delete(final Bytes key, final long timestamp) {
+        final VersionedRecord<byte[]> versionedRecord = inner.delete(key, 
timestamp);
+        return serializeAsBytes(versionedRecord);
+    }
+
+    @Override
+    public String name() {
+        return inner.name();
+    }
+
+    @Deprecated
+    @Override
+    public void init(final ProcessorContext context, final StateStore root) {
+        inner.init(context, root);
+    }
+
+    @Override
+    public void init(final StateStoreContext context, final StateStore root) {
+        inner.init(context, root);
+    }
+
+    @Override
+    public void flush() {
+        inner.flush();
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public boolean persistent() {
+        return inner.persistent();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return inner.persistent();
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound, final QueryConfig config) {
+        return inner.query(query, positionBound, config);
+    }
+
+    @Override
+    public Position getPosition() {
+        return inner.getPosition();
+    }
+
+    @Override
+    public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+        throw new UnsupportedOperationException("Versioned key-value stores do 
not support putIfAbsent(key, value)");
+    }
+
+    @Override
+    public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+        throw new UnsupportedOperationException("Versioned key-value stores do 
not support putAll(entries)");
+    }
+
+    @Override
+    public byte[] delete(final Bytes key) {
+        throw new UnsupportedOperationException("Versioned key-value stores do 
not support delete(key). Use delete(key, timestamp) instead.");
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes 
to) {
+        throw new UnsupportedOperationException("Versioned key-value stores do 
not support range(from, to)");
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from, 
final Bytes to) {
+        throw new UnsupportedOperationException("Versioned key-value stores do 
not support reverseRange(from, to)");
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> all() {
+        throw new UnsupportedOperationException("Versioned key-value stores do 
not support all()");
+    }
+
+    @Override
+    public KeyValueIterator<Bytes, byte[]> reverseAll() {
+        throw new UnsupportedOperationException("Versioned key-value stores do 
not support reverseAll()");
+    }
+
+    @Override
+    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> 
prefixScan(final P prefix, final PS prefixKeySerializer) {
+        throw new UnsupportedOperationException("Versioned key-value stores do 
not support prefixScan(prefix, prefixKeySerializer)");
+    }
+
+    @Override
+    public long approximateNumEntries() {
+        throw new UnsupportedOperationException("Versioned key-value stores do 
not support approximateNumEntries()");
+    }
+
+    private static byte[] serializeAsBytes(final VersionedRecord<byte[]> 
versionedRecord) {
+        if (versionedRecord == null) {
+            return null;
+        }
+        return VALUE_AND_TIMESTAMP_SERIALIZER.serialize(
+            null,
+            ValueAndTimestamp.make(versionedRecord.value(), 
versionedRecord.timestamp()));
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplierTest.java
new file mode 100644
index 00000000000..4dfb83e017b
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplierTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class RocksDbVersionedKeyValueBytesStoreSupplierTest {
+
+    private final static String STORE_NAME = "versioned_store";
+
+    @Test
+    public void shouldUseDefaultSegmentInterval() {
+        verifyExpectedSegmentInterval(0L, 2_000L);
+        verifyExpectedSegmentInterval(1_000L, 2_000L);
+        verifyExpectedSegmentInterval(6_000L, 2_000L);
+        verifyExpectedSegmentInterval(30_000L, 10_000L);
+        verifyExpectedSegmentInterval(60_000L, 20_000L);
+        verifyExpectedSegmentInterval(80_000L, 20_000L);
+        verifyExpectedSegmentInterval(100_000L, 20_000L);
+        verifyExpectedSegmentInterval(200_000L, 40_000L);
+        verifyExpectedSegmentInterval(300_000L, 60_000L);
+        verifyExpectedSegmentInterval(600_000L, 60_000L);
+        verifyExpectedSegmentInterval(720_000L, 60_000L);
+        verifyExpectedSegmentInterval(1200_000L, 100_000L);
+        verifyExpectedSegmentInterval(3600_000L, 300_000L);
+        verifyExpectedSegmentInterval(6000_000L, 300_000L);
+        verifyExpectedSegmentInterval(7200_000L, 300_000L);
+        verifyExpectedSegmentInterval(24 * 3600_000L, 3600_000L);
+    }
+
+    private void verifyExpectedSegmentInterval(final long historyRetention, 
final long expectedSegmentInterval) {
+        assertThat(
+            new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME, 
historyRetention).segmentIntervalMs(),
+            is(expectedSegmentInterval));
+    }
+}
\ No newline at end of file

Reply via email to