This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b31f4bd [FLINK-22808][state/changelog] Log metadata
b31f4bd is described below
commit b31f4bde84caa4664cfa305f7890c9a35ff43821
Author: Roman Khachatryan <[email protected]>
AuthorDate: Mon Mar 29 13:28:58 2021 +0200
[FLINK-22808][state/changelog] Log metadata
---
.../state/changelog/AbstractStateChangeLogger.java | 44 ++++++-
.../changelog/ChangelogKeyedStateBackend.java | 17 ++-
.../state/changelog/KvStateChangeLoggerImpl.java | 6 +-
.../PriorityQueueStateChangeLoggerImpl.java | 6 +-
.../changelog/KvStateChangeLoggerImplTest.java | 70 +++++++++++
.../PriorityQueueStateChangeLoggerImplTest.java | 64 ++++++++++
.../state/changelog/StateChangeLoggerTestBase.java | 133 +++++++++++++++++++++
7 files changed, 330 insertions(+), 10 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
index 55ceaa7..da106c8 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/AbstractStateChangeLogger.java
@@ -18,8 +18,10 @@
package org.apache.flink.state.changelog;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
+import
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.util.function.ThrowingConsumer;
import javax.annotation.Nullable;
@@ -31,23 +33,31 @@ import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static
org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters.CURRENT_STATE_META_INFO_SNAPSHOT_VERSION;
import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD;
import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD_ELEMENT;
import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.ADD_OR_UPDATE_ELEMENT;
import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.CLEAR;
+import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.METADATA;
import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.REMOVE_ELEMENT;
import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.SET;
import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.SET_INTERNAL;
import static org.apache.flink.util.Preconditions.checkNotNull;
abstract class AbstractStateChangeLogger<Key, Value, Ns> implements
StateChangeLogger<Value, Ns> {
+ static final int COMMON_KEY_GROUP = -1;
protected final StateChangelogWriter<?> stateChangelogWriter;
protected final InternalKeyContext<Key> keyContext;
+ protected final RegisteredStateMetaInfoBase metaInfo;
+ private boolean metaDataWritten = false;
public AbstractStateChangeLogger(
- StateChangelogWriter<?> stateChangelogWriter,
InternalKeyContext<Key> keyContext) {
+ StateChangelogWriter<?> stateChangelogWriter,
+ InternalKeyContext<Key> keyContext,
+ RegisteredStateMetaInfoBase metaInfo) {
this.stateChangelogWriter = checkNotNull(stateChangelogWriter);
this.keyContext = checkNotNull(keyContext);
+ this.metaInfo = checkNotNull(metaInfo);
}
@Override
@@ -103,7 +113,7 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
implements StateChangeL
}
protected void log(StateChangeOperation op, Ns ns) throws IOException {
- // todo: log metadata (FLINK-22808)
+ logMetaIfNeeded();
stateChangelogWriter.append(keyContext.getCurrentKeyGroupIndex(),
serialize(op, ns, null));
}
@@ -112,11 +122,28 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
implements StateChangeL
@Nullable ThrowingConsumer<DataOutputViewStreamWrapper,
IOException> dataWriter,
Ns ns)
throws IOException {
- // todo: log metadata (FLINK-22808)
+ logMetaIfNeeded();
stateChangelogWriter.append(
keyContext.getCurrentKeyGroupIndex(), serialize(op, ns,
dataWriter));
}
+ private void logMetaIfNeeded() throws IOException {
+ if (!metaDataWritten) {
+ // todo: add StateChangelogWriter.append() version without a
keygroup
+ // when all callers and implementers are merged (FLINK-21356
or later)
+ stateChangelogWriter.append(
+ COMMON_KEY_GROUP,
+ serializeRaw(
+ out -> {
+ out.writeByte(METADATA.code);
+
out.writeInt(CURRENT_STATE_META_INFO_SNAPSHOT_VERSION);
+ StateMetaInfoSnapshotReadersWriters.getWriter()
+
.writeStateMetaInfoSnapshot(metaInfo.snapshot(), out);
+ }));
+ metaDataWritten = true;
+ }
+ }
+
private byte[] serialize(
StateChangeOperation op,
Ns ns,
@@ -125,6 +152,9 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
implements StateChangeL
return serializeRaw(
wrapper -> {
wrapper.writeByte(op.code);
+ // todo: optimize in FLINK-22944 by either writing short
code or grouping and
+ // writing once (same for key, ns)
+ wrapper.writeUTF(metaInfo.getName());
serializeScope(ns, wrapper);
if (dataWriter != null) {
dataWriter.accept(wrapper);
@@ -163,7 +193,9 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
implements StateChangeL
/** Scope: key + namespace + element (e.g. user map remove or iterator
remove). */
REMOVE_ELEMENT((byte) 7),
/** Scope: key + namespace, first element (e.g. priority queue poll).
*/
- REMOVE_FIRST_ELEMENT((byte) 8);
+ REMOVE_FIRST_ELEMENT((byte) 8),
+ /** State metadata (name, serializers, etc.). */
+ METADATA((byte) 9);
private final byte code;
StateChangeOperation(byte code) {
@@ -177,5 +209,9 @@ abstract class AbstractStateChangeLogger<Key, Value, Ns>
implements StateChangeL
public static StateChangeOperation byCode(byte opCode) {
return checkNotNull(BY_CODES.get(opCode), Byte.toString(opCode));
}
+
+ public byte getCode() {
+ return code;
+ }
}
}
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 805ae95..e234e97 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -43,6 +43,8 @@ import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
@@ -257,7 +259,9 @@ class ChangelogKeyedStateBackend<K>
new PriorityQueueStateChangeLoggerImpl<>(
byteOrderedElementSerializer,
keyedStateBackend.getKeyContext(),
- stateChangelogWriter);
+ stateChangelogWriter,
+ new RegisteredPriorityQueueStateBackendMetaInfo<>(
+ stateName, byteOrderedElementSerializer));
return new ChangelogKeyGroupedPriorityQueue<>(
keyedStateBackend.create(stateName,
byteOrderedElementSerializer),
priorityQueueStateChangeLogger,
@@ -339,6 +343,14 @@ class ChangelogKeyedStateBackend<K>
stateDesc.getClass(), this.getClass());
throw new FlinkRuntimeException(message);
}
+ RegisteredKeyValueStateBackendMetaInfo<N, SV> meta =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
+ stateDesc.getType(),
+ stateDesc.getName(),
+ namespaceSerializer,
+ stateDesc.getSerializer(),
+
(StateSnapshotTransformer.StateSnapshotTransformFactory<SV>)
+ snapshotTransformFactory);
InternalKvState<K, N, SV> state =
keyedStateBackend.createInternalState(
@@ -349,7 +361,8 @@ class ChangelogKeyedStateBackend<K>
state.getNamespaceSerializer(),
state.getValueSerializer(),
keyedStateBackend.getKeyContext(),
- stateChangelogWriter);
+ stateChangelogWriter,
+ meta);
return stateFactory.create(state, kvStateChangeLogger);
}
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
index ffbb670..5615ce7 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/KvStateChangeLoggerImpl.java
@@ -19,6 +19,7 @@ package org.apache.flink.state.changelog;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
@@ -43,8 +44,9 @@ class KvStateChangeLoggerImpl<Key, Value, Ns> extends
AbstractStateChangeLogger<
TypeSerializer<Ns> namespaceSerializer,
TypeSerializer<Value> valueSerializer,
InternalKeyContext<Key> keyContext,
- StateChangelogWriter<?> stateChangelogWriter) {
- super(stateChangelogWriter, keyContext);
+ StateChangelogWriter<?> stateChangelogWriter,
+ RegisteredStateMetaInfoBase metaInfo) {
+ super(stateChangelogWriter, keyContext, metaInfo);
this.keySerializer = checkNotNull(keySerializer);
this.valueSerializer = checkNotNull(valueSerializer);
this.namespaceSerializer = checkNotNull(namespaceSerializer);
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
index bc793c8..c800d7a 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImpl.java
@@ -19,6 +19,7 @@ package org.apache.flink.state.changelog;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
@@ -33,8 +34,9 @@ class PriorityQueueStateChangeLoggerImpl<K, T> extends
AbstractStateChangeLogger
PriorityQueueStateChangeLoggerImpl(
TypeSerializer<T> serializer,
InternalKeyContext<K> keyContext,
- StateChangelogWriter<?> stateChangelogWriter) {
- super(stateChangelogWriter, keyContext);
+ StateChangelogWriter<?> stateChangelogWriter,
+ RegisteredPriorityQueueStateBackendMetaInfo<T> meta) {
+ super(stateChangelogWriter, keyContext, meta);
this.serializer = checkNotNull(serializer);
}
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
new file mode 100644
index 0000000..bfea119
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/KvStateChangeLoggerImplTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
+import
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.apache.flink.api.common.state.StateDescriptor.Type.VALUE;
+import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.MERGE_NS;
+
+/** {@link KvStateChangeLoggerImpl} test. */
+public class KvStateChangeLoggerImplTest extends
StateChangeLoggerTestBase<String> {
+
+ @Override
+ protected StateChangeLogger<String, String> getLogger(
+ TestingStateChangelogWriter writer, InternalKeyContextImpl<String>
keyContext) {
+ StringSerializer keySerializer = new StringSerializer();
+ StringSerializer nsSerializer = new StringSerializer();
+ StringSerializer valueSerializer = new StringSerializer();
+ RegisteredKeyValueStateBackendMetaInfo<String, String> metaInfo =
+ new RegisteredKeyValueStateBackendMetaInfo<>(
+ VALUE, "test", nsSerializer, valueSerializer);
+ return new KvStateChangeLoggerImpl<>(
+ keySerializer, nsSerializer, valueSerializer, keyContext,
writer, metaInfo);
+ }
+
+ @Override
+ protected String getNamespace(String element) {
+ return element;
+ }
+
+ @Override
+ protected Optional<Tuple2<Integer, StateChangeOperation>> log(
+ StateChangeOperation op,
+ String element,
+ StateChangeLogger<String, String> logger,
+ InternalKeyContextImpl<String> keyContext)
+ throws IOException {
+ if (op == MERGE_NS) {
+ keyContext.setCurrentKey(element);
+ ((KvStateChangeLogger<String, String>) logger)
+ .namespacesMerged(element, Collections.emptyList());
+ return Optional.of(Tuple2.of(keyContext.getCurrentKeyGroupIndex(),
op));
+ } else {
+ return super.log(op, element, logger, keyContext);
+ }
+ }
+}
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
new file mode 100644
index 0000000..b6bf280
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/PriorityQueueStateChangeLoggerImplTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import
org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
+import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
+import
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.REMOVE_FIRST_ELEMENT;
+
+/** {@link PriorityQueueStateChangeLoggerImpl} test. */
+public class PriorityQueueStateChangeLoggerImplTest extends
StateChangeLoggerTestBase<Void> {
+
+ @Override
+ protected StateChangeLogger<String, Void> getLogger(
+ TestingStateChangelogWriter writer, InternalKeyContextImpl<String>
keyContext) {
+ StringSerializer valueSerializer = new StringSerializer();
+ RegisteredPriorityQueueStateBackendMetaInfo<String> metaInfo =
+ new RegisteredPriorityQueueStateBackendMetaInfo<>("test",
valueSerializer);
+ return new PriorityQueueStateChangeLoggerImpl<>(
+ valueSerializer, keyContext, writer, metaInfo);
+ }
+
+ @Override
+ protected Optional<Tuple2<Integer, StateChangeOperation>> log(
+ StateChangeOperation op,
+ String element,
+ StateChangeLogger<String, Void> logger,
+ InternalKeyContextImpl<String> keyContext)
+ throws IOException {
+ if (op == REMOVE_FIRST_ELEMENT) {
+ keyContext.setCurrentKey(element);
+ ((PriorityQueueStateChangeLogger<String>)
logger).stateElementPolled();
+ return Optional.of(Tuple2.of(keyContext.getCurrentKeyGroupIndex(),
op));
+ } else {
+ return super.log(op, element, logger, keyContext);
+ }
+ }
+
+ @Override
+ protected Void getNamespace(String element) {
+ return null;
+ }
+}
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
new file mode 100644
index 0000000..fd79d86
--- /dev/null
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.runtime.state.heap.InternalKeyContextImpl;
+import
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.COMMON_KEY_GROUP;
+import static
org.apache.flink.state.changelog.AbstractStateChangeLogger.StateChangeOperation.METADATA;
+import static org.junit.Assert.assertEquals;
+
+abstract class StateChangeLoggerTestBase<Namespace> {
+ /** A basic test for appending the metadata on first state access. */
+ @Test
+ public void testMetadataOperationLogged() throws IOException {
+ TestingStateChangelogWriter writer = new TestingStateChangelogWriter();
+ InternalKeyContextImpl<String> keyContext =
+ new InternalKeyContextImpl<>(KeyGroupRange.of(1, 1000), 1000);
+ StateChangeLogger<String, Namespace> logger = getLogger(writer,
keyContext);
+
+ List<Tuple2<Integer, StateChangeOperation>> expectedAppends = new
ArrayList<>();
+ expectedAppends.add(Tuple2.of(COMMON_KEY_GROUP, METADATA));
+
+ // log every applicable operations, several times each
+ int numOpTypes = StateChangeOperation.values().length;
+ for (int i = 0; i < numOpTypes * 7; i++) {
+ String element = Integer.toString(i);
+ StateChangeOperation operation =
StateChangeOperation.byCode((byte) (i % numOpTypes));
+ log(operation, element, logger,
keyContext).ifPresent(expectedAppends::add);
+ }
+ assertEquals(expectedAppends, writer.appends);
+ }
+
+ protected abstract StateChangeLogger<String, Namespace> getLogger(
+ TestingStateChangelogWriter writer, InternalKeyContextImpl<String>
keyContext);
+
+ protected Optional<Tuple2<Integer, StateChangeOperation>> log(
+ StateChangeOperation op,
+ String element,
+ StateChangeLogger<String, Namespace> logger,
+ InternalKeyContextImpl<String> keyContext)
+ throws IOException {
+ keyContext.setCurrentKey(element);
+ Namespace namespace = getNamespace(element);
+ switch (op) {
+ case ADD:
+ logger.valueAdded(element, namespace);
+ break;
+ case ADD_ELEMENT:
+ logger.valueElementAdded(w -> {}, namespace);
+ break;
+ case REMOVE_ELEMENT:
+ logger.valueElementRemoved(w -> {}, namespace);
+ break;
+ case CLEAR:
+ logger.valueCleared(namespace);
+ break;
+ case SET:
+ logger.valueUpdated(element, namespace);
+ break;
+ case SET_INTERNAL:
+ logger.valueUpdatedInternal(element, namespace);
+ break;
+ case ADD_OR_UPDATE_ELEMENT:
+ logger.valueElementAddedOrUpdated(w -> {}, namespace);
+ break;
+ default:
+ return Optional.empty();
+ }
+ return Optional.of(Tuple2.of(keyContext.getCurrentKeyGroupIndex(),
op));
+ }
+
+ protected abstract Namespace getNamespace(String element);
+
+ @SuppressWarnings("rawtypes")
+ protected static class TestingStateChangelogWriter implements
StateChangelogWriter {
+ private final List<Tuple2<Integer, StateChangeOperation>> appends =
new ArrayList<>();
+
+ @Override
+ public void append(int keyGroup, byte[] value) {
+ appends.add(Tuple2.of(keyGroup,
StateChangeOperation.byCode(value[0])));
+ }
+
+ @Override
+ public SequenceNumber lastAppendedSequenceNumber() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<?> persist(SequenceNumber from) throws
IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void truncate(SequenceNumber to) {}
+
+ @Override
+ public void confirm(SequenceNumber from, SequenceNumber to) {}
+
+ @Override
+ public void reset(SequenceNumber from, SequenceNumber to) {}
+
+ @Override
+ public void close() {}
+ }
+}