[FLINK-5096] Make the RollingSink rescalable. Integrates the RollingSink with the new state abstractions so that its parallelism can change after resuming execution from a savepoint.
This closes #2845. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2b93f69 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2b93f69 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2b93f69 Branch: refs/heads/master Commit: e2b93f69cea0c8a8d05591285cb9c2f4dc64c619 Parents: 333da3a Author: kl0u <[email protected]> Authored: Thu Nov 10 16:57:20 2016 +0100 Committer: zentol <[email protected]> Committed: Sun Nov 27 12:49:54 2016 +0100 ---------------------------------------------------------------------- .../streaming/connectors/fs/RollingSink.java | 305 +++++++++---------- .../fs/RollingSinkFaultTolerance2ITCase.java | 292 ------------------ .../fs/RollingSinkFaultToleranceITCase.java | 29 +- .../connectors/fs/RollingSinkITCase.java | 245 ++++++++++++++- 4 files changed, 404 insertions(+), 467 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e2b93f69/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java index b959bf8..fc4a35e 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,19 +19,22 @@ package org.apache.flink.streaming.connectors.fs; import org.apache.commons.lang3.time.StopWatch; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; +import org.apache.flink.util.Preconditions; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +45,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -50,7 +53,7 @@ import java.util.UUID; /** * Sink that emits its input elements to rolling {@link org.apache.hadoop.fs.FileSystem} files. This - * is itegrated with the checkpointing mechanism to provide exactly once semantics. + * is integrated with the checkpointing mechanism to provide exactly once semantics. * * <p> * When creating the sink a {@code basePath} must be specified. The base directory contains @@ -124,7 +127,9 @@ import java.util.UUID; * @deprecated use {@link BucketingSink} instead. */ @Deprecated -public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConfigurable, Checkpointed<RollingSink.BucketState>, CheckpointListener { +public class RollingSink<T> extends RichSinkFunction<T> + implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener { + private static final long serialVersionUID = 1L; private static Logger LOG = LoggerFactory.getLogger(RollingSink.class); @@ -136,9 +141,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf // These are initialized with some defaults but are meant to be changeable by the user /** - * The default maximum size of part files. - * - * 6 times the default block size + * The default maximum size of part files (currently {@code 384 MB}). */ private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L; @@ -189,7 +192,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf /** - * The base {@code Path} that stored all rolling bucket directories. + * The base {@code Path} that stores all bucket directories. */ private final String basePath; @@ -215,15 +218,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf */ private long batchSize; - /** - * If this is true we remove any leftover in-progress/pending files when the sink is opened. - * - * <p> - * This should only be set to false if using the sink without checkpoints, to not remove - * the files already in the directory. - */ - private boolean cleanupOnOpen = true; - // These are the actually configured prefixes/suffixes private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX; private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX; @@ -258,11 +252,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf private transient Path currentBucketDirectory; /** - * Our subtask index, retrieved from the {@code RuntimeContext} in {@link #open}. - */ - private transient int subtaskIndex; - - /** * For counting the part files inside a bucket directory. Part files follow the patter * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter. */ @@ -271,7 +260,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf /** * Tracks if the writer is currently opened or closed. */ - private transient boolean isWriterOpen = false; + private transient boolean isWriterOpen; /** * We use reflection to get the .truncate() method, this is only available starting with @@ -285,10 +274,12 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf */ private transient BucketState bucketState; + private transient ListState<BucketState> restoredBucketStates; + /** * User-defined FileSystem parameters. */ - private Configuration fsConfig = null; + private Configuration fsConfig; /** * The FileSystem reference. @@ -328,7 +319,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf this.fsConfig = new Configuration(); for(Map.Entry<String, String> entry : config) { fsConfig.setString(entry.getKey(), entry.getValue()); - }; + } return this; } @@ -341,63 +332,57 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf } @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + Preconditions.checkArgument(this.restoredBucketStates == null, + "The " + getClass().getSimpleName() + " has already been initialized."); + + initFileSystem(); + + if (this.refTruncate == null) { + this.refTruncate = reflectTruncate(fs); + } + + OperatorStateStore stateStore = context.getOperatorStateStore(); + restoredBucketStates = stateStore.getSerializableListState("rolling-states"); + + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + if (context.isRestored()) { + LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex); + + for (BucketState bucketState : restoredBucketStates.get()) { + handleRestoredBucketState(bucketState); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("{} (taskIdx= {}) restored {}", getClass().getSimpleName(), subtaskIndex, bucketState); + } + } else { + LOG.info("No state to restore for the {} (taskIdx= {}).", getClass().getSimpleName(), subtaskIndex); + } + } + + @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); partCounter = 0; this.writer = writerTemplate.duplicate(); - if (bucketState == null) { - bucketState = new BucketState(); - } - - initFileSystem(); - refTruncate = reflectTruncate(fs); - - // delete pending/in-progress files that might be left if we fail while - // no checkpoint has yet been done - try { - if (fs.exists(new Path(basePath)) && cleanupOnOpen) { - RemoteIterator<LocatedFileStatus> bucketFiles = fs.listFiles(new Path(basePath), true); - - while (bucketFiles.hasNext()) { - LocatedFileStatus file = bucketFiles.next(); - if (file.getPath().toString().endsWith(pendingSuffix)) { - // only delete files that contain our subtask index - if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) { - LOG.debug("(OPEN) Deleting leftover pending file {}", file.getPath().toString()); - fs.delete(file.getPath(), true); - } - } - if (file.getPath().toString().endsWith(inProgressSuffix)) { - // only delete files that contain our subtask index - if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) { - LOG.debug("(OPEN) Deleting leftover in-progress file {}", file.getPath().toString()); - fs.delete(file.getPath(), true); - } - } - } - } - } catch (IOException e) { - LOG.error("Error while deleting leftover pending/in-progress files: {}", e); - throw new RuntimeException("Error while deleting leftover pending/in-progress files.", e); - } + bucketState = new BucketState(); } /** - * create a file system with the user defined hdfs config + * Create a file system with the user-defined hdfs config * @throws IOException */ private void initFileSystem() throws IOException { - if(fs != null) { + if (fs != null) { return; } org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - if(fsConfig != null) { - String disableCacheName - = String.format("fs.%s.impl.disable.cache", new Object[]{new Path(basePath).toUri().getScheme()}); + if (fsConfig != null) { + String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme()); hadoopConf.setBoolean(disableCacheName, true); for (String key : fsConfig.keySet()) { hadoopConf.set(key, fsConfig.getString(key, null)); @@ -409,21 +394,14 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf @Override public void close() throws Exception { -// boolean interrupted = Thread.interrupted(); closeCurrentPartFile(); - -// if (interrupted) { -// Thread.currentThread().interrupt(); -// } } @Override public void invoke(T value) throws Exception { - if (shouldRoll()) { openNewPartFile(); } - writer.write(value); } @@ -436,6 +414,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf */ private boolean shouldRoll() throws IOException { boolean shouldRoll = false; + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); if (!isWriterOpen) { shouldRoll = true; LOG.debug("RollingSink {} starting new initial bucket. ", subtaskIndex); @@ -482,12 +461,14 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf } } - + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter); // This should work since there is only one parallel subtask that tries names with // our subtask id. Otherwise we would run into concurrency issues here. - while (fs.exists(currentPartPath) || fs.exists(new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix))) { + while (fs.exists(currentPartPath) || + fs.exists(getPendingPathFor(currentPartPath)) || + fs.exists(getInProgressPathFor(currentPartPath))) { partCounter++; currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter); } @@ -497,11 +478,23 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf LOG.debug("Next part path is {}", currentPartPath.toString()); - Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix); + Path inProgressPath = getInProgressPathFor(currentPartPath); writer.open(fs, inProgressPath); isWriterOpen = true; } + private Path getPendingPathFor(Path path) { + return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix); + } + + private Path getInProgressPathFor(Path path) { + return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix); + } + + private Path getValidLengthPathFor(Path path) { + return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix); + } + /** * Closes the current part file. * @@ -516,25 +509,22 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf } if (currentPartPath != null) { - Path inProgressPath = new Path(currentPartPath.getParent(), inProgressPrefix + currentPartPath.getName()).suffix(inProgressSuffix); - Path pendingPath = new Path(currentPartPath.getParent(), pendingPrefix + currentPartPath.getName()).suffix(pendingSuffix); + Path inProgressPath = getInProgressPathFor(currentPartPath); + Path pendingPath = getPendingPathFor(currentPartPath); fs.rename(inProgressPath, pendingPath); - LOG.debug("Moving in-progress bucket {} to pending file {}", - inProgressPath, - pendingPath); + LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, pendingPath); this.bucketState.pendingFiles.add(currentPartPath.toString()); } } /** * Gets the truncate() call using reflection. - * * <p> - * Note: This code comes from Flume + * <b>NOTE: </b>This code comes from Flume */ private Method reflectTruncate(FileSystem fs) { Method m = null; - if(fs != null) { + if (fs != null) { Class<?> fsClass = fs.getClass(); try { m = fsClass.getMethod("truncate", Path.class, long.class); @@ -544,7 +534,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf return null; } - // verify that truncate actually works FSDataOutputStream outputStream; Path testPath = new Path(UUID.randomUUID().toString()); @@ -557,7 +546,6 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf throw new RuntimeException("Could not create file for checking if truncate works.", e); } - try { m.invoke(fs, testPath, 2); } catch (IllegalAccessException | InvocationTargetException e) { @@ -575,75 +563,75 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf return m; } - @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { synchronized (bucketState.pendingFilesPerCheckpoint) { - Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); - Set<Long> checkpointsToRemove = new HashSet<>(); - for (Long pastCheckpointId : pastCheckpointIds) { + Iterator<Map.Entry<Long, List<String>>> pendingCheckpointsIt = + bucketState.pendingFilesPerCheckpoint.entrySet().iterator(); + + while (pendingCheckpointsIt.hasNext()) { + Map.Entry<Long, List<String>> entry = pendingCheckpointsIt.next(); + Long pastCheckpointId = entry.getKey(); + if (pastCheckpointId <= checkpointId) { LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId); // All the pending files are buckets that have been completed but are waiting to be renamed // to their final name - for (String filename : bucketState.pendingFilesPerCheckpoint.get( - pastCheckpointId)) { + for (String filename : entry.getValue()) { Path finalPath = new Path(filename); - Path pendingPath = new Path(finalPath.getParent(), - pendingPrefix + finalPath.getName()).suffix(pendingSuffix); + Path pendingPath = getPendingPathFor(finalPath); fs.rename(pendingPath, finalPath); - LOG.debug( - "Moving pending file {} to final location after complete checkpoint {}.", - pendingPath, - pastCheckpointId); + LOG.debug("Moving pending file {} to final location after complete checkpoint {}.", + pendingPath, pastCheckpointId); } - checkpointsToRemove.add(pastCheckpointId); + pendingCheckpointsIt.remove(); } } - for (Long toRemove: checkpointsToRemove) { - bucketState.pendingFilesPerCheckpoint.remove(toRemove); - } } } @Override - public BucketState snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { + Preconditions.checkNotNull(restoredBucketStates, + "The " + getClass().getSimpleName() + " has not been properly initialized."); + + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + if (isWriterOpen) { - long pos = writer.flush(); bucketState.currentFile = currentPartPath.toString(); - bucketState.currentFileValidLength = pos; + bucketState.currentFileValidLength = writer.flush(); } + synchronized (bucketState.pendingFilesPerCheckpoint) { - bucketState.pendingFilesPerCheckpoint.put(checkpointId, bucketState.pendingFiles); + bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles); } bucketState.pendingFiles = new ArrayList<>(); - return bucketState; + + restoredBucketStates.clear(); + restoredBucketStates.add(bucketState); + + if (LOG.isDebugEnabled()) { + LOG.debug("{} (taskIdx={}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, bucketState); + } } - @Override - public void restoreState(BucketState state) { - bucketState = state; - // we can clean all the pending files since they where renamed to final files - // after this checkpoint was successfull + private void handleRestoredBucketState(BucketState bucketState) { + // we can clean all the pending files since they were renamed to + // final files after this checkpoint was successful + // (we re-start from the last **successful** checkpoint) bucketState.pendingFiles.clear(); - try { - initFileSystem(); - } catch (IOException e) { - LOG.error("Error while creating FileSystem in checkpoint restore.", e); - throw new RuntimeException("Error while creating FileSystem in checkpoint restore.", e); - } + if (bucketState.currentFile != null) { - // We were writing to a file when the last checkpoint occured. This file can either + // We were writing to a file when the last checkpoint occurred. This file can either // be still in-progress or became a pending file at some point after the checkpoint. - // Either way, we have to truncate it back to a valid state (or write a .valid-length) - // file that specifies up to which length it is valid and rename it to the final name + // Either way, we have to truncate it back to a valid state (or write a .valid-length + // file that specifies up to which length it is valid) and rename it to the final name // before starting a new bucket file. Path partPath = new Path(bucketState.currentFile); try { - Path partPendingPath = new Path(partPath.getParent(), pendingPrefix + partPath.getName()).suffix( - pendingSuffix); - Path partInProgressPath = new Path(partPath.getParent(), inProgressPrefix + partPath.getName()).suffix(inProgressSuffix); + Path partPendingPath = getPendingPathFor(partPath); + Path partInProgressPath = getInProgressPathFor(partPath); if (fs.exists(partPendingPath)) { LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath); @@ -660,7 +648,10 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf "it was moved to final location by a previous snapshot restore", bucketState.currentFile); } - refTruncate = reflectTruncate(fs); + if (this.refTruncate == null) { + this.refTruncate = reflectTruncate(fs); + } + // truncate it or write a ".valid-length" file to specify up to which point it is valid if (refTruncate != null) { LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength); @@ -711,7 +702,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf } else { LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength); - Path validLengthFilePath = new Path(partPath.getParent(), validLengthPrefix + partPath.getName()).suffix(validLengthSuffix); + Path validLengthFilePath = getValidLengthPathFor(partPath); if (!fs.exists(validLengthFilePath)) { FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath); lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength)); @@ -722,17 +713,16 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf // invalidate in the state object bucketState.currentFile = null; bucketState.currentFileValidLength = -1; + isWriterOpen = false; } catch (IOException e) { LOG.error("Error while restoring RollingSink state.", e); throw new RuntimeException("Error while restoring RollingSink state.", e); } catch (InvocationTargetException | IllegalAccessException e) { - LOG.error("Cound not invoke truncate.", e); + LOG.error("Could not invoke truncate.", e); throw new RuntimeException("Could not invoke truncate.", e); } } - LOG.debug("Clearing pending/in-progress files."); - // Move files that are confirmed by a checkpoint but did not get moved to final location // because the checkpoint notification did not happen before a failure @@ -743,7 +733,7 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf // to their final name for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) { Path finalPath = new Path(filename); - Path pendingPath = new Path(finalPath.getParent(), pendingPrefix + finalPath.getName()).suffix(pendingSuffix); + Path pendingPath = getPendingPathFor(finalPath); try { if (fs.exists(pendingPath)) { @@ -756,39 +746,10 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf } } } - bucketState.pendingFiles.clear(); + synchronized (bucketState.pendingFilesPerCheckpoint) { bucketState.pendingFilesPerCheckpoint.clear(); } - - // we need to get this here since open() has not yet been called - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - // delete pending files - try { - - RemoteIterator<LocatedFileStatus> bucketFiles = fs.listFiles(new Path(basePath), true); - - while (bucketFiles.hasNext()) { - LocatedFileStatus file = bucketFiles.next(); - if (file.getPath().toString().endsWith(pendingSuffix)) { - // only delete files that contain our subtask index - if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) { - LOG.debug("(RESTORE) Deleting pending file {}", file.getPath().toString()); - fs.delete(file.getPath(), true); - } - } - if (file.getPath().toString().endsWith(inProgressSuffix)) { - // only delete files that contain our subtask index - if (file.getPath().toString().contains(partPrefix + "-" + subtaskIndex + "-")) { - LOG.debug("(RESTORE) Deleting in-progress file {}", file.getPath().toString()); - fs.delete(file.getPath(), true); - } - } - } - } catch (IOException e) { - LOG.error("Error while deleting old pending files: {}", e); - throw new RuntimeException("Error while deleting old pending files.", e); - } } // -------------------------------------------------------------------------------------------- @@ -891,9 +852,12 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf * <p> * This should only be disabled if using the sink without checkpoints, to not remove * the files already in the directory. + * + * @deprecated This option is deprecated and remains only for backwards compatibility. + * We do not clean up lingering files anymore. */ + @Deprecated public RollingSink<T> disableCleanupOnOpen() { - this.cleanupOnOpen = false; return this; } @@ -919,9 +883,9 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf private static final long serialVersionUID = 1L; /** - * The file that was in-progress when the last checkpoint occured. + * The file that was in-progress when the last checkpoint occurred. */ - String currentFile = null; + String currentFile; /** * The valid length of the in-progress file at the time of the last checkpoint. @@ -939,5 +903,14 @@ public class RollingSink<T> extends RichSinkFunction<T> implements InputTypeConf * pending files of completed checkpoints to their final location. */ final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>(); + + @Override + public String toString() { + return + "In-progress=" + currentFile + + " validLength=" + currentFileValidLength + + " pendingForNextCheckpoint=" + pendingFiles + + " pendingForPrevCheckpoints=" + pendingFilesPerCheckpoint; + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e2b93f69/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java deleted file mode 100644 index 2b93721..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultTolerance2ITCase.java +++ /dev/null @@ -1,292 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ -package org.apache.flink.streaming.connectors.fs; - -import com.google.common.collect.Sets; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase; -import org.apache.flink.util.NetUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; - -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.Random; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.junit.Assert.assertTrue; - -/** -* Tests for {@link RollingSink}. -* -* <p> -* This test only verifies the exactly once behaviour of the sink. Another test tests the -* rolling behaviour. -* -* <p> -* This differs from RollingSinkFaultToleranceITCase in that the checkpoint interval is extremely -* high. This provokes the case that the sink restarts without any checkpoint having been performed. -* This tests the initial cleanup of pending/in-progress files. -* -* @deprecated should be removed with the {@link RollingSink}. -*/ -@Deprecated -public class RollingSinkFaultTolerance2ITCase extends StreamFaultToleranceTestBase { - - final long NUM_STRINGS = 16_000; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - private static MiniDFSCluster hdfsCluster; - private static org.apache.hadoop.fs.FileSystem dfs; - - private static String outPath; - - - - @BeforeClass - public static void createHDFS() throws IOException { - Configuration conf = new Configuration(); - - File dataDir = tempFolder.newFolder(); - - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); - hdfsCluster = builder.build(); - - dfs = hdfsCluster.getFileSystem(); - - outPath = "hdfs://" - + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) - + "/string-non-rolling-out-no-checkpoint"; - } - - @AfterClass - public static void destroyHDFS() { - if (hdfsCluster != null) { - hdfsCluster.shutdown(); - } - } - - - @Override - public void testProgram(StreamExecutionEnvironment env) { - assertTrue("Broken test setup", NUM_STRINGS % 40 == 0); - - int PARALLELISM = 12; - - env.enableCheckpointing(Long.MAX_VALUE); - env.setParallelism(PARALLELISM); - env.disableOperatorChaining(); - - DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain(); - - DataStream<String> mapped = stream - .map(new OnceFailingIdentityMapper(NUM_STRINGS)); - - RollingSink<String> sink = new RollingSink<String>(outPath) - .setBucketer(new NonRollingBucketer()) - .setBatchSize(5000) - .setValidLengthPrefix("") - .setPendingPrefix(""); - - mapped.addSink(sink); - - } - - @Override - public void postSubmit() throws Exception { - // We read the files and verify that we have read all the strings. If a valid-length - // file exists we only read the file to that point. (This test should work with - // FileSystems that support truncate() and with others as well.) - - Pattern messageRegex = Pattern.compile("message (\\d*)"); - - // Keep a set of the message IDs that we read. The size must equal the read count and - // the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some - // elements twice. - Set<Integer> readNumbers = Sets.newHashSet(); - int numRead = 0; - - RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path( - outPath), true); - - while (files.hasNext()) { - LocatedFileStatus file = files.next(); - - if (!file.getPath().toString().endsWith(".valid-length")) { - int validLength = (int) file.getLen(); - if (dfs.exists(file.getPath().suffix(".valid-length"))) { - FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length")); - String validLengthString = inStream.readUTF(); - validLength = Integer.parseInt(validLengthString); - System.out.println("VALID LENGTH: " + validLength); - } - FSDataInputStream inStream = dfs.open(file.getPath()); - byte[] buffer = new byte[validLength]; - inStream.readFully(0, buffer, 0, validLength); - inStream.close(); - - ByteArrayInputStream bais = new ByteArrayInputStream(buffer); - - InputStreamReader inStreamReader = new InputStreamReader(bais); - BufferedReader br = new BufferedReader(inStreamReader); - - String line = br.readLine(); - while (line != null) { - Matcher matcher = messageRegex.matcher(line); - if (matcher.matches()) { - numRead++; - int messageId = Integer.parseInt(matcher.group(1)); - readNumbers.add(messageId); - } else { - Assert.fail("Read line does not match expected pattern."); - } - line = br.readLine(); - } - br.close(); - inStreamReader.close(); - bais.close(); - } - } - - // Verify that we read all strings (at-least-once) - Assert.assertEquals(NUM_STRINGS, readNumbers.size()); - - // Verify that we don't have duplicates (boom!, exactly-once) - Assert.assertEquals(NUM_STRINGS, numRead); - } - - private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> { - private static final long serialVersionUID = 1L; - - private static volatile boolean hasFailed = false; - - private final long numElements; - - private long failurePos; - private long count; - - - OnceFailingIdentityMapper(long numElements) { - this.numElements = numElements; - } - - @Override - public void open(org.apache.flink.configuration.Configuration parameters) throws IOException { - long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - - failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; - count = 0; - } - - @Override - public String map(String value) throws Exception { - count++; - if (!hasFailed && count >= failurePos) { - hasFailed = true; - throw new Exception("Test Failure"); - } - - return value; - } - } - - private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> - implements CheckpointedAsynchronously<Integer> { - - private static final long serialVersionUID = 1L; - - private final long numElements; - - private int index; - - private volatile boolean isRunning = true; - - - StringGeneratingSourceFunction(long numElements) { - this.numElements = numElements; - } - - @Override - public void run(SourceContext<String> ctx) throws Exception { - final Object lockingObject = ctx.getCheckpointLock(); - - final int step = getRuntimeContext().getNumberOfParallelSubtasks(); - - if (index == 0) { - index = getRuntimeContext().getIndexOfThisSubtask(); - } - - while (isRunning && index < numElements) { - - Thread.sleep(1); - synchronized (lockingObject) { - ctx.collect("message " + index); - index += step; - } - } - } - - @Override - public void cancel() { - isRunning = false; - } - - private static String randomString(StringBuilder bld, Random rnd) { - final int len = rnd.nextInt(10) + 5; - - for (int i = 0; i < len; i++) { - char next = (char) (rnd.nextInt(20000) + 33); - bld.append(next); - } - - return bld.toString(); - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; - } - - @Override - public void restoreState(Integer state) { - index = state; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e2b93f69/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java index 9c39237..36c0d03 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -42,6 +42,7 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; +import java.util.HashSet; import java.util.Random; import java.util.Set; import java.util.regex.Matcher; @@ -71,7 +72,8 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas private static String outPath; - + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; @BeforeClass public static void createHDFS() throws IOException { @@ -97,14 +99,13 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas } } - @Override public void testProgram(StreamExecutionEnvironment env) { assertTrue("Broken test setup", NUM_STRINGS % 40 == 0); int PARALLELISM = 12; - env.enableCheckpointing(200); + env.enableCheckpointing(20); env.setParallelism(PARALLELISM); env.disableOperatorChaining(); @@ -117,7 +118,9 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas .setBucketer(new NonRollingBucketer()) .setBatchSize(10000) .setValidLengthPrefix("") - .setPendingPrefix(""); + .setPendingPrefix("") + .setPendingSuffix(PENDING_SUFFIX) + .setInProgressSuffix(IN_PROGRESS_SUFFIX); mapped.addSink(sink); @@ -135,7 +138,9 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas // the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some // elements twice. Set<Integer> readNumbers = Sets.newHashSet(); - int numRead = 0; + + HashSet<String> uniqMessagesRead = new HashSet<>(); + HashSet<String> messagesInCommittedFiles = new HashSet<>(); RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path( outPath), true); @@ -165,7 +170,15 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas while (line != null) { Matcher matcher = messageRegex.matcher(line); if (matcher.matches()) { - numRead++; + uniqMessagesRead.add(line); + + // check that in the committed files there are no duplicates + if (!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && !file.getPath().toString().endsWith(PENDING_SUFFIX)) { + if (!messagesInCommittedFiles.add(line)) { + Assert.fail("Duplicate entry in committed bucket."); + } + } + int messageId = Integer.parseInt(matcher.group(1)); readNumbers.add(messageId); } else { @@ -183,7 +196,7 @@ public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBas Assert.assertEquals(NUM_STRINGS, readNumbers.size()); // Verify that we don't have duplicates (boom!, exactly-once) - Assert.assertEquals(NUM_STRINGS, numRead); + Assert.assertEquals(NUM_STRINGS, uniqMessagesRead.size()); } private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> { http://git-wip-us.apache.org/repos/asf/flink/blob/e2b93f69/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index c8440ef..80ae294 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericData.StringType; import org.apache.avro.specific.SpecificDatumReader; +import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.RichFilterFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; @@ -34,7 +35,12 @@ import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter.AvroKeyValue; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.util.Collector; import org.apache.flink.util.NetUtils; @@ -638,6 +644,243 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase { Assert.assertEquals(8, numFiles); } + private static final String PART_PREFIX = "part"; + private static final String PENDING_SUFFIX = ".pending"; + private static final String IN_PROGRESS_SUFFIX = ".in-progress"; + private static final String VALID_LENGTH_SUFFIX = ".valid"; + + @Test + public void testBucketStateTransitions() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.open(); + + testHarness.setProcessingTime(0L); + + // we have a bucket size of 5 bytes, so each record will get its own bucket, + // i.e. the bucket should roll after every record. + + testHarness.processElement(new StreamRecord<>("test1", 1L)); + testHarness.processElement(new StreamRecord<>("test2", 1L)); + checkFs(outDir, 1, 1 ,0, 0); + + testHarness.processElement(new StreamRecord<>("test3", 1L)); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.snapshot(0, 0); + checkFs(outDir, 1, 2, 0, 0); + + testHarness.notifyOfCompletedCheckpoint(0); + checkFs(outDir, 1, 0, 2, 0); + + OperatorStateHandles snapshot = testHarness.snapshot(1, 0); + + testHarness.close(); + checkFs(outDir, 0, 1, 2, 0); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + checkFs(outDir, 0, 0, 3, 1); + + snapshot = testHarness.snapshot(2, 0); + + testHarness.processElement(new StreamRecord<>("test4", 10)); + checkFs(outDir, 1, 0, 3, 1); + + testHarness = createRescalingTestSink(outDir, 1, 0); + testHarness.setup(); + testHarness.initializeState(snapshot); + testHarness.open(); + + // the in-progress file remains as we do not clean up now + checkFs(outDir, 1, 0, 3, 1); + + testHarness.close(); + + // at close it is not moved to final because it is not part + // of the current task's state, it was just a not cleaned up leftover. + checkFs(outDir, 1, 0, 3, 1); + } + + @Test + public void testScalingDown() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 3, 0); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 3, 1); + testHarness2.setup(); + testHarness2.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2); + testHarness3.setup(); + testHarness3.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + checkFs(outDir, 1, 0, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test2", 0L)); + testHarness2.processElement(new StreamRecord<>("test3", 0L)); + testHarness2.processElement(new StreamRecord<>("test4", 0L)); + testHarness2.processElement(new StreamRecord<>("test5", 0L)); + testHarness2.processElement(new StreamRecord<>("test6", 0L)); + checkFs(outDir, 2, 4, 0, 0); + + testHarness3.processElement(new StreamRecord<>("test7", 0L)); + testHarness3.processElement(new StreamRecord<>("test8", 0L)); + checkFs(outDir, 3, 5, 0, 0); + + // intentionally we snapshot them in a not ascending order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness3.snapshot(0, 0), + testHarness1.snapshot(0, 0), + testHarness2.snapshot(0, 0) + ); + + // with the above state reshuffling, we expect testHarness4 to take the + // state of the previous testHarness3 and testHarness1 while testHarness5 + // will take that of the previous testHarness1 + + OneInputStreamOperatorTestHarness<String, Object> testHarness4 = createRescalingTestSink(outDir, 2, 0); + testHarness4.setup(); + testHarness4.initializeState(mergedSnapshot); + testHarness4.open(); + + // we do not have a length file for part-2-0 because bucket part-2-0 + // was not "in-progress", but "pending" (its full content is valid). + checkFs(outDir, 1, 4, 3, 2); + + OneInputStreamOperatorTestHarness<String, Object> testHarness5 = createRescalingTestSink(outDir, 2, 1); + testHarness5.setup(); + testHarness5.initializeState(mergedSnapshot); + testHarness5.open(); + + checkFs(outDir, 0, 0, 8, 3); + } + + @Test + public void testScalingUp() throws Exception { + final File outDir = tempFolder.newFolder(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir, 2, 0); + testHarness1.setup(); + testHarness1.open(); + + OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir, 2, 0); + testHarness2.setup(); + testHarness2.open(); + + testHarness1.processElement(new StreamRecord<>("test1", 0L)); + testHarness1.processElement(new StreamRecord<>("test2", 0L)); + + checkFs(outDir, 1, 1, 0, 0); + + testHarness2.processElement(new StreamRecord<>("test3", 0L)); + testHarness2.processElement(new StreamRecord<>("test4", 0L)); + testHarness2.processElement(new StreamRecord<>("test5", 0L)); + + checkFs(outDir, 2, 3, 0, 0); + + // intentionally we snapshot them in the reverse order so that the states are shuffled + OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState( + testHarness2.snapshot(0, 0), + testHarness1.snapshot(0, 0) + ); + + testHarness1 = createRescalingTestSink(outDir, 3, 0); + testHarness1.setup(); + testHarness1.initializeState(mergedSnapshot); + testHarness1.open(); + + checkFs(outDir, 1, 1, 3, 1); + + testHarness2 = createRescalingTestSink(outDir, 3, 1); + testHarness2.setup(); + testHarness2.initializeState(mergedSnapshot); + testHarness2.open(); + + checkFs(outDir, 0, 0, 5, 2); + + OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir, 3, 2); + testHarness3.setup(); + testHarness3.initializeState(mergedSnapshot); + testHarness3.open(); + + checkFs(outDir, 0, 0, 5, 2); + + testHarness1.processElement(new StreamRecord<>("test6", 0)); + testHarness2.processElement(new StreamRecord<>("test6", 0)); + testHarness3.processElement(new StreamRecord<>("test6", 0)); + + // 3 for the different tasks + checkFs(outDir, 3, 0, 5, 2); + + testHarness1.snapshot(1, 0); + testHarness2.snapshot(1, 0); + testHarness3.snapshot(1, 0); + + testHarness1.close(); + testHarness2.close(); + testHarness3.close(); + + checkFs(outDir, 0, 3, 5, 2); + } + + private OneInputStreamOperatorTestHarness<String, Object> createRescalingTestSink( + File outDir, int totalParallelism, int taskIdx) throws Exception { + + RollingSink<String> sink = new RollingSink<String>(outDir.getAbsolutePath()) + .setWriter(new StringWriter<String>()) + .setBatchSize(5) + .setPartPrefix(PART_PREFIX) + .setInProgressPrefix("") + .setPendingPrefix("") + .setValidLengthPrefix("") + .setInProgressSuffix(IN_PROGRESS_SUFFIX) + .setPendingSuffix(PENDING_SUFFIX) + .setValidLengthSuffix(VALID_LENGTH_SUFFIX); + + return createTestSink(sink, totalParallelism, taskIdx); + } + + private <T> OneInputStreamOperatorTestHarness<T, Object> createTestSink( + RollingSink<T> sink, int totalParallelism, int taskIdx) throws Exception { + return new OneInputStreamOperatorTestHarness<>(new StreamSink<>(sink), 10, totalParallelism, taskIdx); + } + + private void checkFs(File outDir, int inprogress, int pending, int completed, int valid) throws IOException { + int inProg = 0; + int pend = 0; + int compl = 0; + int val = 0; + + for (File file: FileUtils.listFiles(outDir, null, true)) { + if (file.getAbsolutePath().endsWith("crc")) { + continue; + } + String path = file.getPath(); + if (path.endsWith(IN_PROGRESS_SUFFIX)) { + inProg++; + } else if (path.endsWith(PENDING_SUFFIX)) { + pend++; + } else if (path.endsWith(VALID_LENGTH_SUFFIX)) { + val++; + } else if (path.contains(PART_PREFIX)) { + compl++; + } + } + + Assert.assertEquals(inprogress, inProg); + Assert.assertEquals(pending, pend); + Assert.assertEquals(completed, compl); + Assert.assertEquals(valid, val); + } private static class TestSourceFunction implements SourceFunction<Tuple2<Integer, String>> { private static final long serialVersionUID = 1L;
