[
https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16266992#comment-16266992
]
ASF GitHub Bot commented on FLINK-7873:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5074#discussion_r153241166
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/state/CachedCheckpointStreamFactory.java
---
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle;
+import org.apache.flink.runtime.checkpoint.CheckpointCache;
+import
org.apache.flink.runtime.checkpoint.CheckpointCache.CachedOutputStream;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * {@link CachedCheckpointStreamFactory} is used to build an output stream
that writes data to both remote end (e.g:DFS) and local end.
+ * Local data is managed by {@link CheckpointCache}. It simply wraps
{@link CheckpointCache} and {@link CheckpointStreamFactory} and
+ * create a hybrid output stream by {@link CheckpointCache} and {@link
CheckpointStreamFactory}, this hybrid output stream will write
+ * to both remote end and local end.
+ */
+public class CachedCheckpointStreamFactory implements
CheckpointStreamFactory {
+
+ private static Logger LOG =
LoggerFactory.getLogger(CachedCheckpointStreamFactory.class);
+
+ private final CheckpointCache cache;
+ private final CheckpointStreamFactory remoteFactory;
+
+ public CachedCheckpointStreamFactory(CheckpointCache cache,
CheckpointStreamFactory factory) {
+ this.cache = cache;
+ this.remoteFactory = Preconditions.checkNotNull(factory,
"Remote stream factory is null.");
+ }
+
+ public CheckpointStateOutputStream
createCheckpointStateOutputStream(long checkpointID, long timestamp,
StateHandleID handleID) throws Exception {
+ return createCheckpointStateOutputStream(checkpointID,
timestamp, handleID, false);
+ }
+
+ public CheckpointStateOutputStream
createCheckpointStateOutputStream(long checkpointID, long timestamp,
StateHandleID handleID, boolean placeholder) throws Exception {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("create cache output stream: cpkID:{}
placeHolder:{}", checkpointID, placeholder);
+ }
+ CachedOutputStream cachedOut = null;
+ if (cache != null) {
+ cachedOut = cache.createOutputStream(checkpointID,
handleID, placeholder);
+ }
+ CheckpointStateOutputStream remoteOut = null;
+ if (!placeholder) {
+ remoteOut =
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+ }
+ CachedCheckpointStateOutputStream output = new
CachedCheckpointStateOutputStream(cachedOut, remoteOut);
+ return output;
+ }
+
+ @Override
+ public CheckpointStateOutputStream
createCheckpointStateOutputStream(long checkpointID, long timestamp) throws
Exception {
+ LOG.warn("create output stream which is not cacheable.");
+ return
remoteFactory.createCheckpointStateOutputStream(checkpointID, timestamp);
+ }
+
+ @Override
+ public void close() throws Exception {
+ remoteFactory.close();
+ }
+
+ /**
+ * A hybrid checkpoint output stream which write data to both remote
end and local end,
+ * writing data locally failed won't stop writing to remote. This
hybrid output stream
+ * will return a {@link CachedStreamStateHandle} in
closeAndGetHandle(), it can be used for read data locally.
+ */
+ public static class CachedCheckpointStateOutputStream extends
CheckpointStateOutputStream {
+
+ private CachedOutputStream cacheOut = null;
+ private CheckpointStateOutputStream remoteOut = null;
+
+ public CachedCheckpointStateOutputStream(CachedOutputStream
cacheOut, CheckpointStateOutputStream remoteOut) {
+ this.cacheOut = cacheOut;
+ this.remoteOut = remoteOut;
+ }
+
+ @Override
+ public StreamStateHandle closeAndGetHandle() throws IOException
{
+ if (cacheOut != null) {
+ // finalize cache data
+ StateHandleID cacheId = cacheOut.getCacheID();
+ cacheOut.end();
+
+ StreamStateHandle remoteHandle;
+ if (remoteOut != null) {
+ remoteHandle =
remoteOut.closeAndGetHandle();
+ } else {
+ remoteHandle = new
PlaceholderStreamStateHandle(cacheId);
+ }
+ return new CachedStreamStateHandle(cacheId,
remoteHandle);
+ } else {
+ if (remoteOut != null) {
+ return remoteOut.closeAndGetHandle();
+ } else {
+ return null;
+ }
+ }
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return remoteOut != null ? remoteOut.getPos() :-1L;
--- End diff --
This can be a bit dangerous. Some code may rely on certain offsets in the
file, so both streams `cachedOut` and `remoteOut` better should have aligned
offsets before we start. This should at least be checked in a precondition
because it could lead to subtile, hard to debug bugs.
> Introduce CheckpointCacheManager for reading checkpoint data locally when
> performing failover
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-7873
> URL: https://issues.apache.org/jira/browse/FLINK-7873
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Affects Versions: 1.3.2
> Reporter: Sihua Zhou
> Assignee: Sihua Zhou
>
> Why i introduce this:
> Current recover strategy will always read checkpoint data from remote
> FileStream (HDFS). This will cost a lot of bandwidth when the state is so big
> (e.g. 1T). What's worse, if this job performs recover again and again, it can
> eat up all network bandwidth and do a huge hurt to cluster. So, I proposed
> that we can cache the checkpoint data locally, and read checkpoint data from
> local cache as well as we can, we read the data from remote only if we fail
> locally. The advantage is that if a execution is assigned to the same
> TaskManager as before, it can save a lot of bandwith, and obtain a faster
> recover.
> Solution:
> TaskManager do the cache job and manage the cached data itself. It simple
> use a TTL-like method to manage cache entry's dispose, we dispose a entry if
> it wasn't be touched for a X time, once we touch a entry we reset the TTL for
> it. In this way, all jobs is done by TaskManager, it transparent to
> JobManager. The only problem is that we may dispose a entry that maybe
> useful, in this case, we have to read from remote data finally, but users can
> avoid this by set a proper TTL value according to checkpoint interval and
> other things.
> Can someone give me some advice? I would appreciate it very much~
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)