Github user makeyang commented on a diff in the pull request:
https://github.com/apache/flink/pull/5908#discussion_r187297365
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
---
@@ -395,36 +402,102 @@ public final OperatorSnapshotFutures
snapshotState(long checkpointId, long times
*
* @param context context that provides information and means required
for taking a snapshot
*/
- public void snapshotState(StateSnapshotContext context) throws
Exception {
+ public void snapshotState(StateSnapshotContext context,
OperatorSnapshotFutures snapshotInProgress) throws Exception {
if (getKeyedStateBackend() != null) {
KeyedStateCheckpointOutputStream out;
-
+ OperatorStateCheckpointOutputStream metaOut;
try {
out = context.getRawKeyedOperatorStateOutput();
} catch (Exception exception) {
throw new Exception("Could not open raw keyed
operator state stream for " +
getOperatorName() + '.', exception);
}
-
try {
- KeyGroupsList allKeyGroups =
out.getKeyGroupList();
- for (int keyGroupIdx : allKeyGroups) {
- out.startNewKeyGroup(keyGroupIdx);
-
-
timeServiceManager.snapshotStateForKeyGroup(
- new
DataOutputViewStreamWrapper(out), keyGroupIdx);
- }
+ metaOut =
context.getRawKeyedOperatorStateMetaOutput();
} catch (Exception exception) {
- throw new Exception("Could not write timer
service of " + getOperatorName() +
- " to checkpoint state stream.",
exception);
- } finally {
- try {
- out.close();
- } catch (Exception closeException) {
- LOG.warn("Could not close raw keyed
operator state stream for {}. This " +
- "might have prevented deleting
some state data.", getOperatorName(), closeException);
- }
+ throw new Exception("Could not open raw
operator state stream for " +
+ getOperatorName() + '.', exception);
}
+ final Tuple4<Integer, Map<String,
HeapInternalTimerService>, Integer, TreeSet<Integer>> ret =
timeServiceManager.startOneSnapshotState();
+ final int currentSnapshotVersion = ret.f0;
+ final Map<String, HeapInternalTimerService>
timerServices = ret.f1;
+ final Integer stateTableVersion = ret.f2;
+ final TreeSet<Integer> snapshotVersions = ret.f3;
+ LOG.info("snapshotVersions after calling
startOneSnapshotState:" + snapshotVersions.toString());
+ Callable<Boolean> snapshotTimerCallable = new
Callable() {
+ @Override
+ public Boolean call() {
+ try {
+ KeyGroupsList allKeyGroups =
out.getKeyGroupList();
+ metaOut.startNewPartition();
+ DataOutputViewStreamWrapper
metaWrapper = new DataOutputViewStreamWrapper(metaOut);
+
metaWrapper.writeInt(stateTableVersion);
+ if (snapshotVersions.size() >
0) {
+
metaWrapper.writeInt(snapshotVersions.size());
+ for (Integer i :
snapshotVersions) {
+
metaWrapper.writeInt(i);
+ }
+ }
+ else {
+ metaWrapper.writeInt(0);
+ }
+ int keyGroupCount =
allKeyGroups.getNumberOfKeyGroups();
+
metaWrapper.writeInt(keyGroupCount);
+ for (int keyGroupIdx :
allKeyGroups) {
+
out.startNewKeyGroup(keyGroupIdx);
+
metaWrapper.writeInt(keyGroupIdx);
+
InternalTimerServiceSerializationProxy serializationProxy =
+ new
InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx,
+
currentSnapshotVersion, timeServiceManager, metaWrapper);
+
+
serializationProxy.write(new DataOutputViewStreamWrapper(out));
+
+ }
+ LOG.info("return Tuple4 and
snapshotVersions:" + snapshotVersions.toString());
+ return true;
+ } catch (Exception exception) {
+ LOG.error("Could not write
timer service of " + getOperatorName() +
+ " to checkpoint state
stream.", exception);
+ return false;
+ } finally {
+
timeServiceManager.stopOneSnapshotState(currentSnapshotVersion);
+
StateSnapshotContextSynchronousImpl snapshotContext =
(StateSnapshotContextSynchronousImpl) context;
+ try {
+
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
+ } catch (IOException e) {
+
LOG.warn("setKeyedStateRawFuture in callable excpetion", e);
+ return false;
--- End diff --
how about change return type from Boolean to Tuple which contains throwable
when exception happens?
---