This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2e3bbe63c1d KAFKA-14491: [9/N] Add versioned bytes store and supplier
(#13250)
2e3bbe63c1d is described below
commit 2e3bbe63c1d7a5484d0475c57e82011fada9cfa5
Author: Victoria Xia <[email protected]>
AuthorDate: Fri Feb 17 11:06:04 2023 -0800
KAFKA-14491: [9/N] Add versioned bytes store and supplier (#13250)
As part of introducing versioned key-value stores in KIP-889, we'd like a
way to represent a versioned key-value store (VersionedKeyValueStore<Bytes,
byte[]>) as a regular key-value store (KeyValueStore<Bytes, byte[]>) in order
to be compatible with existing DSL methods for passing key-value stores, e.g.,
StreamsBuilder#table() and KTable methods, which are explicitly typed to accept
Materialized<K, V, KeyValueStore<Bytes, byte[]>. This way, we do not need to
introduce new versions of [...]
This PR introduces the new VersionedBytesStore extends KeyValueStore<Bytes,
byte[]> interface for this purpose, along with the corresponding supplier
(VersionedBytesStoreSupplier) and implementation
(RocksDbVersionedKeyValueBytesStoreSupplier). The
RocksDbVersionedKeyValueBytesStoreSupplier implementation leverages an adapter
(VersionedKeyValueToBytesStoreAdapter) to assist in converting from
VersionedKeyValueStore to VersionedBytesStore.
Reviewers: Matthias J. Sax <[email protected]>
---
.../kafka/streams/state/VersionedBytesStore.java | 36 ++++
.../streams/state/VersionedBytesStoreSupplier.java | 46 +++++
...RocksDbVersionedKeyValueBytesStoreSupplier.java | 82 +++++++++
.../VersionedKeyValueToBytesStoreAdapter.java | 196 +++++++++++++++++++++
...sDbVersionedKeyValueBytesStoreSupplierTest.java | 53 ++++++
5 files changed, 413 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java
new file mode 100644
index 00000000000..c0d4ef68904
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStore.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.Bytes;
+
+/**
+ * A representation of a versioned key-value store as a {@link KeyValueStore}
of type <Bytes, byte[]>.
+ * See {@link VersionedBytesStoreSupplier} for more.
+ */
+public interface VersionedBytesStore extends KeyValueStore<Bytes, byte[]>,
TimestampedBytesStore {
+
+ /**
+ * The analog of {@link VersionedKeyValueStore#get(Object, long)}.
+ */
+ byte[] get(Bytes key, long asOfTimestamp);
+
+ /**
+ * The analog of {@link VersionedKeyValueStore#delete(Object, long)}.
+ */
+ byte[] delete(Bytes key, long timestamp);
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStoreSupplier.java
new file mode 100644
index 00000000000..5ba97335afa
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/VersionedBytesStoreSupplier.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Predicate;
+
+/**
+ * A store supplier that can be used to create one or more versioned key-value
stores,
+ * specifically, {@link VersionedBytesStore} instances.
+ * <p>
+ * Rather than representing the returned store as a {@link
VersionedKeyValueStore} of
+ * type <Bytes, byte[]>, this supplier interface represents the returned
store as a
+ * {@link KeyValueStore} of type <Bytes, byte[]> (via {@link
VersionedBytesStore}) in order to be compatible with
+ * existing DSL methods for passing key-value stores such as {@link
StreamsBuilder#table(String, Materialized)}
+ * and {@link KTable#filter(Predicate, Materialized)}. A {@code
VersionedKeyValueStore<Bytes, byte[]>}
+ * is represented as a {@code KeyValueStore KeyValueStore<Bytes, byte[]>} by
interpreting the
+ * value bytes as containing record timestamp information in addition to raw
record values.
+ */
+public interface VersionedBytesStoreSupplier extends
KeyValueBytesStoreSupplier {
+
+ /**
+ * Returns the history retention (in milliseconds) that stores created
from this supplier will have.
+ * This value is used to set compaction configs on store changelog topics
(if relevant).
+ *
+ * @return history retention, i.e., length of time that old record
versions are available for
+ * query from a versioned store
+ */
+ long historyRetentionMs();
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplier.java
new file mode 100644
index 00000000000..3efffd3a98b
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
+
+public class RocksDbVersionedKeyValueBytesStoreSupplier implements
VersionedBytesStoreSupplier {
+
+ private final String name;
+ private final long historyRetentionMs;
+ private final long segmentIntervalMs;
+
+ public RocksDbVersionedKeyValueBytesStoreSupplier(final String name,
+ final long
historyRetentionMs
+ ) {
+ this(name, historyRetentionMs,
defaultSegmentInterval(historyRetentionMs));
+ }
+
+ public RocksDbVersionedKeyValueBytesStoreSupplier(final String name,
+ final long
historyRetentionMs,
+ final long
segmentIntervalMs
+ ) {
+ this.name = name;
+ this.historyRetentionMs = historyRetentionMs;
+ this.segmentIntervalMs = segmentIntervalMs;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public long historyRetentionMs() {
+ return historyRetentionMs;
+ }
+
+ public long segmentIntervalMs() {
+ return segmentIntervalMs;
+ }
+
+ @Override
+ public KeyValueStore<Bytes, byte[]> get() {
+ return new VersionedKeyValueToBytesStoreAdapter(
+ new RocksDBVersionedStore(name, metricsScope(),
historyRetentionMs, segmentIntervalMs)
+ );
+ }
+
+ @Override
+ public String metricsScope() {
+ return "rocksdb";
+ }
+
+ private static long defaultSegmentInterval(final long historyRetentionMs) {
+ // Selected somewhat arbitrarily. Optimal value depends heavily on
data distribution
+ if (historyRetentionMs <= 60_000L) {
+ return Math.max(historyRetentionMs / 3, 2_000L);
+ } else if (historyRetentionMs <= 300_000L) {
+ return Math.max(historyRetentionMs / 5, 20_000L);
+ } else if (historyRetentionMs <= 3600_000L) {
+ return Math.max(historyRetentionMs / 12, 60_000L);
+ } else {
+ return Math.max(historyRetentionMs / 24, 300_000L);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
new file mode 100644
index 00000000000..3dbed8e97e7
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/VersionedKeyValueToBytesStoreAdapter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * Adapts from {@link VersionedKeyValueStore} (user-friendly versioned store
interface) to
+ * {@link KeyValueStore}. By representing a {@code VersionedKeyValueStore} as a
+ * {@code KeyValueStore}, this allows reuse of existing {@link StreamsBuilder}
and {@link KTable}
+ * method interfaces which accept {@code Materialized<K, V,
KeyValueStore<Bytes, byte[]>)}
+ * for versioned key-value stores.
+ */
+public class VersionedKeyValueToBytesStoreAdapter implements
VersionedBytesStore {
+ private static final Serde<ValueAndTimestamp<byte[]>>
VALUE_AND_TIMESTAMP_SERDE
+ = new NullableValueAndTimestampSerde<>(new ByteArraySerde());
+ private static final Serializer<ValueAndTimestamp<byte[]>>
VALUE_AND_TIMESTAMP_SERIALIZER
+ = VALUE_AND_TIMESTAMP_SERDE.serializer();
+ private static final Deserializer<ValueAndTimestamp<byte[]>>
VALUE_AND_TIMESTAMP_DESERIALIZER
+ = VALUE_AND_TIMESTAMP_SERDE.deserializer();
+
+ final VersionedKeyValueStore<Bytes, byte[]> inner;
+
+ public VersionedKeyValueToBytesStoreAdapter(final
VersionedKeyValueStore<Bytes, byte[]> inner) {
+ this.inner = Objects.requireNonNull(inner);
+ }
+
+ @Override
+ public void put(final Bytes key, final byte[] rawValueAndTimestamp) {
+ if (rawValueAndTimestamp == null) {
+ throw new IllegalArgumentException("Put to versioned store must
always include timestamp, including for tombstones.");
+ }
+ final ValueAndTimestamp<byte[]> valueAndTimestamp
+ = VALUE_AND_TIMESTAMP_DESERIALIZER.deserialize(null,
rawValueAndTimestamp);
+ inner.put(
+ key,
+ valueAndTimestamp.value(),
+ valueAndTimestamp.timestamp()
+ );
+ }
+
+ @Override
+ public byte[] get(final Bytes key) {
+ final VersionedRecord<byte[]> versionedRecord = inner.get(key);
+ return serializeAsBytes(versionedRecord);
+ }
+
+ @Override
+ public byte[] get(final Bytes key, final long asOfTimestamp) {
+ final VersionedRecord<byte[]> versionedRecord = inner.get(key,
asOfTimestamp);
+ return serializeAsBytes(versionedRecord);
+ }
+
+ @Override
+ public byte[] delete(final Bytes key, final long timestamp) {
+ final VersionedRecord<byte[]> versionedRecord = inner.delete(key,
timestamp);
+ return serializeAsBytes(versionedRecord);
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Deprecated
+ @Override
+ public void init(final ProcessorContext context, final StateStore root) {
+ inner.init(context, root);
+ }
+
+ @Override
+ public void init(final StateStoreContext context, final StateStore root) {
+ inner.init(context, root);
+ }
+
+ @Override
+ public void flush() {
+ inner.flush();
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return inner.persistent();
+ }
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query, final PositionBound
positionBound, final QueryConfig config) {
+ return inner.query(query, positionBound, config);
+ }
+
+ @Override
+ public Position getPosition() {
+ return inner.getPosition();
+ }
+
+ @Override
+ public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+ throw new UnsupportedOperationException("Versioned key-value stores do
not support putIfAbsent(key, value)");
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+ throw new UnsupportedOperationException("Versioned key-value stores do
not support putAll(entries)");
+ }
+
+ @Override
+ public byte[] delete(final Bytes key) {
+ throw new UnsupportedOperationException("Versioned key-value stores do
not support delete(key). Use delete(key, timestamp) instead.");
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> range(final Bytes from, final Bytes
to) {
+ throw new UnsupportedOperationException("Versioned key-value stores do
not support range(from, to)");
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> reverseRange(final Bytes from,
final Bytes to) {
+ throw new UnsupportedOperationException("Versioned key-value stores do
not support reverseRange(from, to)");
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> all() {
+ throw new UnsupportedOperationException("Versioned key-value stores do
not support all()");
+ }
+
+ @Override
+ public KeyValueIterator<Bytes, byte[]> reverseAll() {
+ throw new UnsupportedOperationException("Versioned key-value stores do
not support reverseAll()");
+ }
+
+ @Override
+ public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]>
prefixScan(final P prefix, final PS prefixKeySerializer) {
+ throw new UnsupportedOperationException("Versioned key-value stores do
not support prefixScan(prefix, prefixKeySerializer)");
+ }
+
+ @Override
+ public long approximateNumEntries() {
+ throw new UnsupportedOperationException("Versioned key-value stores do
not support approximateNumEntries()");
+ }
+
+ private static byte[] serializeAsBytes(final VersionedRecord<byte[]>
versionedRecord) {
+ if (versionedRecord == null) {
+ return null;
+ }
+ return VALUE_AND_TIMESTAMP_SERIALIZER.serialize(
+ null,
+ ValueAndTimestamp.make(versionedRecord.value(),
versionedRecord.timestamp()));
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplierTest.java
new file mode 100644
index 00000000000..4dfb83e017b
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDbVersionedKeyValueBytesStoreSupplierTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+public class RocksDbVersionedKeyValueBytesStoreSupplierTest {
+
+ private final static String STORE_NAME = "versioned_store";
+
+ @Test
+ public void shouldUseDefaultSegmentInterval() {
+ verifyExpectedSegmentInterval(0L, 2_000L);
+ verifyExpectedSegmentInterval(1_000L, 2_000L);
+ verifyExpectedSegmentInterval(6_000L, 2_000L);
+ verifyExpectedSegmentInterval(30_000L, 10_000L);
+ verifyExpectedSegmentInterval(60_000L, 20_000L);
+ verifyExpectedSegmentInterval(80_000L, 20_000L);
+ verifyExpectedSegmentInterval(100_000L, 20_000L);
+ verifyExpectedSegmentInterval(200_000L, 40_000L);
+ verifyExpectedSegmentInterval(300_000L, 60_000L);
+ verifyExpectedSegmentInterval(600_000L, 60_000L);
+ verifyExpectedSegmentInterval(720_000L, 60_000L);
+ verifyExpectedSegmentInterval(1200_000L, 100_000L);
+ verifyExpectedSegmentInterval(3600_000L, 300_000L);
+ verifyExpectedSegmentInterval(6000_000L, 300_000L);
+ verifyExpectedSegmentInterval(7200_000L, 300_000L);
+ verifyExpectedSegmentInterval(24 * 3600_000L, 3600_000L);
+ }
+
+ private void verifyExpectedSegmentInterval(final long historyRetention,
final long expectedSegmentInterval) {
+ assertThat(
+ new RocksDbVersionedKeyValueBytesStoreSupplier(STORE_NAME,
historyRetention).segmentIntervalMs(),
+ is(expectedSegmentInterval));
+ }
+}
\ No newline at end of file