curcur commented on a change in pull request #16606:
URL: https://github.com/apache/flink/pull/16606#discussion_r683190222
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
##########
@@ -610,6 +612,10 @@ public void testSharedIncrementalStateDeRegistration()
throws Exception {
}
}
+ @Override
+ @Test
+ public void testMaterializedRestore() {}
+
Review comment:
The delegated state backend does not have materialization; that's why
the test is ignored.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+ private static final Logger LOG =
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+ /**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details
in FLINK-23441.
+ */
+ private static final CheckpointOptions CHECKPOINT_OPTIONS =
+ new CheckpointOptions(
+ CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault());
+
+ /** task mailbox executor, execute from Task Thread. */
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Async thread pool, to complete async phase of materialization. */
+ private final ExecutorService asyncOperationsThreadPool;
+
+ /** scheduled executor, periodically trigger materialization. */
+ private final ScheduledExecutorService periodicExecutor;
+
+ private final CheckpointStreamFactory streamFactory;
+
+ private final Snapshotable keyedStateBackend;
+
+ private final StateChangelogWriter<ChangelogStateHandle>
stateChangelogWriter;
+
+ private final AsyncExceptionHandler asyncExceptionHandler;
+
+ private final int allowedNumberOfFailures;
+
+ /** Materialization failure retries. */
+ private final AtomicInteger retries;
+
+ /** Making sure only one materialization on going at a time. */
+ private final AtomicBoolean materializationOnGoing;
+
+ private long materializedId;
+
+ private MaterializedState materializedState;
+
+ public PeriodicMaterializer(
+ MailboxExecutor mailboxExecutor,
+ ExecutorService asyncOperationsThreadPool,
+ Snapshotable keyedStateBackend,
+ CheckpointStorageWorkerView checkpointStorageWorkerView,
+ StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter,
+ AsyncExceptionHandler asyncExceptionHandler,
+ MaterializedState materializedState,
+ long periodicMaterializeInitDelay,
+ long periodicMaterializeInterval,
+ int allowedNumberOfFailures,
+ boolean materializationEnabled) {
+ this.mailboxExecutor = mailboxExecutor;
+ this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+ this.keyedStateBackend = keyedStateBackend;
+ this.stateChangelogWriter = stateChangelogWriter;
+ this.asyncExceptionHandler = asyncExceptionHandler;
+ this.periodicExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory("periodic-materialization"));
+ this.streamFactory = shared ->
checkpointStorageWorkerView.createTaskOwnedStateStream();
+ this.allowedNumberOfFailures = allowedNumberOfFailures;
+ this.materializationOnGoing = new AtomicBoolean(false);
+ this.retries = new AtomicInteger(allowedNumberOfFailures);
+
+ this.materializedId = 0;
+ this.materializedState = materializedState;
+
+ if (materializationEnabled) {
+ this.periodicExecutor.scheduleAtFixedRate(
+ this::triggerMaterialization,
+ periodicMaterializeInitDelay,
+ periodicMaterializeInterval,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @VisibleForTesting
+ public void triggerMaterialization() {
+ mailboxExecutor.execute(
+ () -> {
+ // Only one materialization ongoing at a time
+ if (!materializationOnGoing.compareAndSet(false, true)) {
+ return;
+ }
+
+ SequenceNumber upTo =
stateChangelogWriter.lastAppendedSequenceNumber().next();
+
+ if (upTo.equals(materializedState.lastMaterializedTo())) {
+ return;
+ }
+
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture =
+ keyedStateBackend.snapshot(
+ materializedId++, // TO DISCUSS: seems
this does not matter,
+ // based on
+ System.currentTimeMillis(),
+ streamFactory,
+ CHECKPOINT_OPTIONS);
+
+ // TODO: add metadata to log FLINK-23170
+ asyncOperationsThreadPool.execute(
+ () ->
asyncMaterialization(materializedRunnableFuture, upTo));
+ },
+ "materialization");
+ }
+
+ private void asyncMaterialization(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture,
+ SequenceNumber upTo) {
+
+ FileSystemSafetyNet.initializeSafetyNetForThread();
+ try {
+ FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture);
+ mailboxExecutor.execute(
+ () -> {
+ materializedState =
+ new MaterializedState(
+
getMaterializedResult(materializedRunnableFuture.get()),
+ new ArrayList<>(),
+ upTo);
+
+ checkState(
+ materializationOnGoing.compareAndSet(true,
false),
Review comment:
That's a good idea, info level should also be fine I think.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+ private static final Logger LOG =
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+ /**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details
in FLINK-23441.
+ */
+ private static final CheckpointOptions CHECKPOINT_OPTIONS =
+ new CheckpointOptions(
+ CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault());
+
+ /** task mailbox executor, execute from Task Thread. */
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Async thread pool, to complete async phase of materialization. */
+ private final ExecutorService asyncOperationsThreadPool;
+
+ /** scheduled executor, periodically trigger materialization. */
+ private final ScheduledExecutorService periodicExecutor;
+
+ private final CheckpointStreamFactory streamFactory;
+
+ private final Snapshotable keyedStateBackend;
+
+ private final StateChangelogWriter<ChangelogStateHandle>
stateChangelogWriter;
+
+ private final AsyncExceptionHandler asyncExceptionHandler;
+
+ private final int allowedNumberOfFailures;
+
+ /** Materialization failure retries. */
+ private final AtomicInteger retries;
+
+ /** Making sure only one materialization on going at a time. */
+ private final AtomicBoolean materializationOnGoing;
+
+ private long materializedId;
+
+ private MaterializedState materializedState;
+
+ public PeriodicMaterializer(
+ MailboxExecutor mailboxExecutor,
+ ExecutorService asyncOperationsThreadPool,
+ Snapshotable keyedStateBackend,
+ CheckpointStorageWorkerView checkpointStorageWorkerView,
+ StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter,
+ AsyncExceptionHandler asyncExceptionHandler,
+ MaterializedState materializedState,
+ long periodicMaterializeInitDelay,
+ long periodicMaterializeInterval,
+ int allowedNumberOfFailures,
+ boolean materializationEnabled) {
+ this.mailboxExecutor = mailboxExecutor;
+ this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+ this.keyedStateBackend = keyedStateBackend;
+ this.stateChangelogWriter = stateChangelogWriter;
+ this.asyncExceptionHandler = asyncExceptionHandler;
+ this.periodicExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory("periodic-materialization"));
+ this.streamFactory = shared ->
checkpointStorageWorkerView.createTaskOwnedStateStream();
+ this.allowedNumberOfFailures = allowedNumberOfFailures;
+ this.materializationOnGoing = new AtomicBoolean(false);
+ this.retries = new AtomicInteger(allowedNumberOfFailures);
+
+ this.materializedId = 0;
+ this.materializedState = materializedState;
+
+ if (materializationEnabled) {
+ this.periodicExecutor.scheduleAtFixedRate(
+ this::triggerMaterialization,
+ periodicMaterializeInitDelay,
+ periodicMaterializeInterval,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @VisibleForTesting
+ public void triggerMaterialization() {
+ mailboxExecutor.execute(
+ () -> {
+ // Only one materialization ongoing at a time
+ if (!materializationOnGoing.compareAndSet(false, true)) {
+ return;
+ }
+
+ SequenceNumber upTo =
stateChangelogWriter.lastAppendedSequenceNumber().next();
+
+ if (upTo.equals(materializedState.lastMaterializedTo())) {
+ return;
+ }
+
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture =
+ keyedStateBackend.snapshot(
+ materializedId++, // TO DISCUSS: seems
this does not matter,
+ // based on
+ System.currentTimeMillis(),
+ streamFactory,
+ CHECKPOINT_OPTIONS);
+
+ // TODO: add metadata to log FLINK-23170
+ asyncOperationsThreadPool.execute(
+ () ->
asyncMaterialization(materializedRunnableFuture, upTo));
+ },
+ "materialization");
+ }
+
+ private void asyncMaterialization(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture,
+ SequenceNumber upTo) {
+
+ FileSystemSafetyNet.initializeSafetyNetForThread();
+ try {
+ FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture);
+ mailboxExecutor.execute(
+ () -> {
+ materializedState =
+ new MaterializedState(
+
getMaterializedResult(materializedRunnableFuture.get()),
+ new ArrayList<>(),
+ upTo);
+
+ checkState(
+ materializationOnGoing.compareAndSet(true,
false),
Review comment:
That's a good idea, info level should also be fine I think. Let's keep
it as debug for now.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+ private static final Logger LOG =
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+ /**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details
in FLINK-23441.
+ */
+ private static final CheckpointOptions CHECKPOINT_OPTIONS =
+ new CheckpointOptions(
+ CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault());
+
+ /** task mailbox executor, execute from Task Thread. */
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Async thread pool, to complete async phase of materialization. */
+ private final ExecutorService asyncOperationsThreadPool;
+
+ /** scheduled executor, periodically trigger materialization. */
+ private final ScheduledExecutorService periodicExecutor;
+
+ private final CheckpointStreamFactory streamFactory;
+
+ private final Snapshotable keyedStateBackend;
+
+ private final StateChangelogWriter<ChangelogStateHandle>
stateChangelogWriter;
+
+ private final AsyncExceptionHandler asyncExceptionHandler;
+
+ private final int allowedNumberOfFailures;
+
+ /** Materialization failure retries. */
+ private final AtomicInteger retries;
+
+ /** Making sure only one materialization on going at a time. */
+ private final AtomicBoolean materializationOnGoing;
Review comment:
yes, I was thinking back-and-force whether this flag to only be touched
in the task thread. For now, it can be non-atomic.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+ private static final Logger LOG =
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+ /**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details
in FLINK-23441.
+ */
+ private static final CheckpointOptions CHECKPOINT_OPTIONS =
+ new CheckpointOptions(
+ CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault());
+
+ /** task mailbox executor, execute from Task Thread. */
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Async thread pool, to complete async phase of materialization. */
+ private final ExecutorService asyncOperationsThreadPool;
+
+ /** scheduled executor, periodically trigger materialization. */
+ private final ScheduledExecutorService periodicExecutor;
+
+ private final CheckpointStreamFactory streamFactory;
+
+ private final Snapshotable keyedStateBackend;
+
+ private final StateChangelogWriter<ChangelogStateHandle>
stateChangelogWriter;
+
+ private final AsyncExceptionHandler asyncExceptionHandler;
+
+ private final int allowedNumberOfFailures;
+
+ /** Materialization failure retries. */
+ private final AtomicInteger retries;
+
+ /** Making sure only one materialization on going at a time. */
+ private final AtomicBoolean materializationOnGoing;
+
+ private long materializedId;
+
+ private MaterializedState materializedState;
+
+ public PeriodicMaterializer(
+ MailboxExecutor mailboxExecutor,
+ ExecutorService asyncOperationsThreadPool,
+ Snapshotable keyedStateBackend,
+ CheckpointStorageWorkerView checkpointStorageWorkerView,
+ StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter,
+ AsyncExceptionHandler asyncExceptionHandler,
+ MaterializedState materializedState,
+ long periodicMaterializeInitDelay,
+ long periodicMaterializeInterval,
+ int allowedNumberOfFailures,
+ boolean materializationEnabled) {
+ this.mailboxExecutor = mailboxExecutor;
+ this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+ this.keyedStateBackend = keyedStateBackend;
+ this.stateChangelogWriter = stateChangelogWriter;
+ this.asyncExceptionHandler = asyncExceptionHandler;
+ this.periodicExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory("periodic-materialization"));
+ this.streamFactory = shared ->
checkpointStorageWorkerView.createTaskOwnedStateStream();
+ this.allowedNumberOfFailures = allowedNumberOfFailures;
+ this.materializationOnGoing = new AtomicBoolean(false);
+ this.retries = new AtomicInteger(allowedNumberOfFailures);
+
+ this.materializedId = 0;
+ this.materializedState = materializedState;
+
+ if (materializationEnabled) {
+ this.periodicExecutor.scheduleAtFixedRate(
+ this::triggerMaterialization,
+ periodicMaterializeInitDelay,
+ periodicMaterializeInterval,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @VisibleForTesting
+ public void triggerMaterialization() {
+ mailboxExecutor.execute(
+ () -> {
+ // Only one materialization ongoing at a time
+ if (!materializationOnGoing.compareAndSet(false, true)) {
+ return;
+ }
+
+ SequenceNumber upTo =
stateChangelogWriter.lastAppendedSequenceNumber().next();
+
+ if (upTo.equals(materializedState.lastMaterializedTo())) {
+ return;
+ }
+
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture =
+ keyedStateBackend.snapshot(
+ materializedId++, // TO DISCUSS: seems
this does not matter,
+ // based on
+ System.currentTimeMillis(),
+ streamFactory,
+ CHECKPOINT_OPTIONS);
+
+ // TODO: add metadata to log FLINK-23170
+ asyncOperationsThreadPool.execute(
+ () ->
asyncMaterialization(materializedRunnableFuture, upTo));
+ },
+ "materialization");
+ }
+
+ private void asyncMaterialization(
+ RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture,
+ SequenceNumber upTo) {
+
+ FileSystemSafetyNet.initializeSafetyNetForThread();
+ try {
+ FutureUtils.runIfNotDoneAndGet(materializedRunnableFuture);
Review comment:
but when exceptions happen, we also need to close the safety net, that's
why put within `finally`
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+ private static final Logger LOG =
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+ /**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details
in FLINK-23441.
+ */
+ private static final CheckpointOptions CHECKPOINT_OPTIONS =
+ new CheckpointOptions(
+ CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault());
+
+ /** task mailbox executor, execute from Task Thread. */
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Async thread pool, to complete async phase of materialization. */
+ private final ExecutorService asyncOperationsThreadPool;
+
+ /** scheduled executor, periodically trigger materialization. */
+ private final ScheduledExecutorService periodicExecutor;
+
+ private final CheckpointStreamFactory streamFactory;
+
+ private final Snapshotable keyedStateBackend;
+
+ private final StateChangelogWriter<ChangelogStateHandle>
stateChangelogWriter;
+
+ private final AsyncExceptionHandler asyncExceptionHandler;
+
+ private final int allowedNumberOfFailures;
+
+ /** Materialization failure retries. */
+ private final AtomicInteger retries;
+
+ /** Making sure only one materialization on going at a time. */
+ private final AtomicBoolean materializationOnGoing;
Review comment:
yes, I was thinking back-and-forth whether this flag to only be touched
in the task thread. For now, it can be non-atomic.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
+ private static final Logger LOG =
LoggerFactory.getLogger(PeriodicMaterializer.class);
+
+ /**
+ * ChangelogStateBackend only supports CheckpointType.CHECKPOINT; The rest
of information in
+ * CheckpointOptions is not used in Snapshotable#snapshot(). More details
in FLINK-23441.
+ */
+ private static final CheckpointOptions CHECKPOINT_OPTIONS =
+ new CheckpointOptions(
+ CheckpointType.CHECKPOINT,
CheckpointStorageLocationReference.getDefault());
+
+ /** task mailbox executor, execute from Task Thread. */
+ private final MailboxExecutor mailboxExecutor;
+
+ /** Async thread pool, to complete async phase of materialization. */
+ private final ExecutorService asyncOperationsThreadPool;
+
+ /** scheduled executor, periodically trigger materialization. */
+ private final ScheduledExecutorService periodicExecutor;
+
+ private final CheckpointStreamFactory streamFactory;
+
+ private final Snapshotable keyedStateBackend;
+
+ private final StateChangelogWriter<ChangelogStateHandle>
stateChangelogWriter;
+
+ private final AsyncExceptionHandler asyncExceptionHandler;
+
+ private final int allowedNumberOfFailures;
+
+ /** Materialization failure retries. */
+ private final AtomicInteger retries;
+
+ /** Making sure only one materialization on going at a time. */
+ private final AtomicBoolean materializationOnGoing;
+
+ private long materializedId;
+
+ private MaterializedState materializedState;
+
+ public PeriodicMaterializer(
+ MailboxExecutor mailboxExecutor,
+ ExecutorService asyncOperationsThreadPool,
+ Snapshotable keyedStateBackend,
+ CheckpointStorageWorkerView checkpointStorageWorkerView,
+ StateChangelogWriter<ChangelogStateHandle> stateChangelogWriter,
+ AsyncExceptionHandler asyncExceptionHandler,
+ MaterializedState materializedState,
+ long periodicMaterializeInitDelay,
+ long periodicMaterializeInterval,
+ int allowedNumberOfFailures,
+ boolean materializationEnabled) {
+ this.mailboxExecutor = mailboxExecutor;
+ this.asyncOperationsThreadPool = asyncOperationsThreadPool;
+ this.keyedStateBackend = keyedStateBackend;
+ this.stateChangelogWriter = stateChangelogWriter;
+ this.asyncExceptionHandler = asyncExceptionHandler;
+ this.periodicExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ExecutorThreadFactory("periodic-materialization"));
Review comment:
Yes, I think add task name is a good idea!
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -329,37 +332,47 @@ public boolean
deregisterKeySelectionListener(KeySelectionListener<K> listener)
// materialization may truncate only a part of the previous result and
the backend would
// have to split it somehow for the former option, so the latter is
used.
lastCheckpointId = checkpointId;
- lastUploadedFrom = materializedTo;
+ lastUploadedFrom =
periodicMaterializer.getMaterializedState().lastMaterializedTo();
lastUploadedTo =
stateChangelogWriter.lastAppendedSequenceNumber().next();
LOG.debug(
"snapshot for checkpoint {}, change range: {}..{}",
checkpointId,
lastUploadedFrom,
lastUploadedTo);
+
+ MaterializedState materializedStateCopy =
periodicMaterializer.getMaterializedState();
+
return toRunnableFuture(
stateChangelogWriter
.persist(lastUploadedFrom)
- .thenApply(this::buildSnapshotResult));
+ .thenApply(delta -> buildSnapshotResult(delta,
materializedStateCopy)));
}
- private SnapshotResult<KeyedStateHandle>
buildSnapshotResult(ChangelogStateHandle delta) {
- // Can be called by either task thread during the sync checkpoint
phase (if persist future
- // was already completed); or by the writer thread otherwise. So need
to synchronize.
- // todo: revisit after FLINK-21357 - use mailbox action?
- synchronized (materialized) {
- // collections don't change once started and handles are immutable
- List<ChangelogStateHandle> prevDeltaCopy = new
ArrayList<>(restoredNonMaterialized);
- if (delta != null && delta.getStateSize() > 0) {
- prevDeltaCopy.add(delta);
- }
- if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) {
- return SnapshotResult.empty();
- } else {
- return SnapshotResult.of(
- new ChangelogStateBackendHandleImpl(
- materialized, prevDeltaCopy,
getKeyGroupRange()));
- }
+ @Override
+ @VisibleForTesting
+ public void triggerMaterialization() {
+ periodicMaterializer.triggerMaterialization();
+ }
Review comment:
:-)
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -329,37 +332,47 @@ public boolean
deregisterKeySelectionListener(KeySelectionListener<K> listener)
// materialization may truncate only a part of the previous result and
the backend would
// have to split it somehow for the former option, so the latter is
used.
lastCheckpointId = checkpointId;
- lastUploadedFrom = materializedTo;
+ lastUploadedFrom =
periodicMaterializer.getMaterializedState().lastMaterializedTo();
lastUploadedTo =
stateChangelogWriter.lastAppendedSequenceNumber().next();
LOG.debug(
"snapshot for checkpoint {}, change range: {}..{}",
checkpointId,
lastUploadedFrom,
lastUploadedTo);
+
+ MaterializedState materializedStateCopy =
periodicMaterializer.getMaterializedState();
+
return toRunnableFuture(
stateChangelogWriter
.persist(lastUploadedFrom)
- .thenApply(this::buildSnapshotResult));
+ .thenApply(delta -> buildSnapshotResult(delta,
materializedStateCopy)));
}
- private SnapshotResult<KeyedStateHandle>
buildSnapshotResult(ChangelogStateHandle delta) {
- // Can be called by either task thread during the sync checkpoint
phase (if persist future
- // was already completed); or by the writer thread otherwise. So need
to synchronize.
- // todo: revisit after FLINK-21357 - use mailbox action?
- synchronized (materialized) {
- // collections don't change once started and handles are immutable
- List<ChangelogStateHandle> prevDeltaCopy = new
ArrayList<>(restoredNonMaterialized);
- if (delta != null && delta.getStateSize() > 0) {
- prevDeltaCopy.add(delta);
- }
- if (prevDeltaCopy.isEmpty() && materialized.isEmpty()) {
- return SnapshotResult.empty();
- } else {
- return SnapshotResult.of(
- new ChangelogStateBackendHandleImpl(
- materialized, prevDeltaCopy,
getKeyGroupRange()));
- }
+ @Override
+ @VisibleForTesting
+ public void triggerMaterialization() {
+ periodicMaterializer.triggerMaterialization();
+ }
Review comment:
:-), agree.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
##########
@@ -127,13 +128,29 @@
private boolean forceAvro = false;
private long autoWatermarkInterval = 200;
+ // ---------- statebackend related configurations
------------------------------
/**
* Interval in milliseconds for sending latency tracking marks from the
sources to the sinks.
*/
private long latencyTrackingInterval =
MetricOptions.LATENCY_INTERVAL.defaultValue();
private boolean isLatencyTrackingConfigured = false;
+ /** Interval in milliseconds to perform periodic materialization. */
+ private long periodicMaterializeInterval =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue();
+
+ /** Interval in milliseconds for initial delay of periodic
materialization. */
+ private long periodicMaterializeInitDelay =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue();
+
+ /** Max allowed number of failures */
+ private int materializationMaxAllowedFailures =
+
StateBackendOptions.MATERIALIZATION_MAX_ALLOWED_FAILURES.defaultValue();
+
+ /** Flag to enable periodic materialization */
+ private boolean isPeriodicMaterializationEnabled = false;
Review comment:
I was thinking for testing purposes.
Not for the unit test, but for the randomization test, when we only want to
test changelog itself, not including materialization.
WDYT?
##########
File path:
flink-core/src/main/java/org/apache/flink/configuration/StateBackendOptions.java
##########
@@ -88,4 +88,35 @@
.defaultValue(128)
.withDescription(
"Defines the number of measured latencies to
maintain at each state access operation.");
+
+
@Documentation.Section(Documentation.Sections.STATE_BACKEND_MATERIALIZATION)
+ public static final ConfigOption<Long> PERIODIC_MATERIALIZATION_INTERVAL =
+ ConfigOptions.key("state.backend.periodic-materialize.interval")
+ .longType()
+ .defaultValue(10000L)
+ .withDescription(
+ "Defines the interval in milliseconds to perform
periodic materialization for state backend.");
Review comment:
I am thinking to create a separate ChangelogOptions class, for all such
configurations.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
##########
@@ -127,13 +128,29 @@
private boolean forceAvro = false;
private long autoWatermarkInterval = 200;
+ // ---------- statebackend related configurations
------------------------------
/**
* Interval in milliseconds for sending latency tracking marks from the
sources to the sinks.
*/
private long latencyTrackingInterval =
MetricOptions.LATENCY_INTERVAL.defaultValue();
private boolean isLatencyTrackingConfigured = false;
+ /** Interval in milliseconds to perform periodic materialization. */
+ private long periodicMaterializeInterval =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INTERVAL.defaultValue();
+
+ /** Interval in milliseconds for initial delay of periodic
materialization. */
+ private long periodicMaterializeInitDelay =
+
StateBackendOptions.PERIODIC_MATERIALIZATION_INIT_DELAY.defaultValue();
Review comment:
That's the init delay. I can remove it if you think it is not necessary.
I do not have a strong opinion.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/PeriodicMaterializer.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FileSystemSafetyNet;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
+import org.apache.flink.runtime.mailbox.MailboxExecutor;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
+import org.apache.flink.runtime.state.changelog.SequenceNumber;
+import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Periodically make materialization for the delegated state backend. */
+public class PeriodicMaterializer {
Review comment:
I am actually thinking `PeriodicMaterializer` a good name, it is clear
what it is and how it achieve this.
Maybe later we will have different Materialzier as well.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/AsyncExceptionHandler.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.runtime.execution.Environment;
+
+/** Handling exceptions thrown by non-task thread. */
+public class AsyncExceptionHandler {
Review comment:
Yes, should reuse it.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/AsyncExceptionHandler.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.runtime.execution.Environment;
+
+/** Handling exceptions thrown by non-task thread. */
+public class AsyncExceptionHandler {
Review comment:
Yes, should reuse it.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/materializer/AsyncExceptionHandler.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.changelog.materializer;
+
+import org.apache.flink.runtime.execution.Environment;
+
+/** Handling exceptions thrown by non-task thread. */
+public class AsyncExceptionHandler {
Review comment:
The main reason I did not reuse `StreamTaskAsyncExceptionHandler` is
that both the interface and the implementation are in the module of
"flink-streaming-java".
I can either pull them out to (runtime, for example) or make the changelog
depends on flink-streaming-java (adding the dependency has some problems, but I
do not remember exactly)
I also think we should not duplicate code. But on the other hand, we may end
up with different ways of handling async exceptions. So in that sense, it might
also be fine.
=====
Discussed offline: I will interface out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]