http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java deleted file mode 100644 index cf2c373..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java +++ /dev/null @@ -1,1082 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs.bucketing; - -import org.apache.commons.lang3.time.StopWatch; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.InputTypeConfigurable; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.connectors.fs.Clock; -import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; -import org.apache.flink.streaming.connectors.fs.StringWriter; -import org.apache.flink.streaming.connectors.fs.Writer; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.util.Preconditions; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.Iterator; - -/** - * Sink that emits its input elements to {@link org.apache.hadoop.fs.FileSystem} files within - * buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics. - * - * <p> - * When creating the sink a {@code basePath} must be specified. The base directory contains - * one directory for every bucket. The bucket directories themselves contain several part files, - * one for each parallel subtask of the sink. These part files contain the actual output data. - * - * <p> - * The sink uses a {@link Bucketer} to determine in which bucket directory each element should - * be written to inside the base directory. The {@code Bucketer} can, for example, use time or - * a property of the element to determine the bucket directory. The default {@code Bucketer} is a - * {@link DateTimeBucketer} which will create one new bucket every hour. You can specify - * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}. For example, use the - * {@link BasePathBucketer} if you don't want to have buckets but still want to write part-files - * in a fault-tolerant way. - * - * <p> - * The filenames of the part files contain the part prefix, the parallel subtask index of the sink - * and a rolling counter. For example the file {@code "part-1-17"} contains the data from - * {@code subtask 1} of the sink and is the {@code 17th} bucket created by that subtask. Per default - * the part prefix is {@code "part"} but this can be configured using {@link #setPartPrefix(String)}. - * When a part file becomes bigger than the user-specified batch size the current part file is closed, - * the part counter is increased and a new part file is created. The batch size defaults to {@code 384MB}, - * this can be configured using {@link #setBatchSize(long)}. - * - * <p> - * In some scenarios, the open buckets are required to change based on time. In these cases, the sink - * needs to determine when a bucket has become inactive, in order to flush and close the part file. - * To support this there are two configurable settings: - * <ol> - * <li>the frequency to check for inactive buckets, configured by {@link #setInactiveBucketCheckInterval(long)}, - * and</li> - * <li>the minimum amount of time a bucket has to not receive any data before it is considered inactive, - * configured by {@link #setInactiveBucketThreshold(long)}</li> - * </ol> - * Both of these parameters default to {@code 60,000 ms}, or {@code 1 min}. - * - * <p> - * Part files can be in one of three states: {@code in-progress}, {@code pending} or {@code finished}. - * The reason for this is how the sink works together with the checkpointing mechanism to provide exactly-once - * semantics and fault-tolerance. The part file that is currently being written to is {@code in-progress}. Once - * a part file is closed for writing it becomes {@code pending}. When a checkpoint is successful the currently - * pending files will be moved to {@code finished}. - * - * <p> - * If case of a failure, and in order to guarantee exactly-once semantics, the sink should roll back to the state it - * had when that last successful checkpoint occurred. To this end, when restoring, the restored files in {@code pending} - * state are transferred into the {@code finished} state while any {@code in-progress} files are rolled back, so that - * they do not contain data that arrived after the checkpoint from which we restore. If the {@code FileSystem} supports - * the {@code truncate()} method this will be used to reset the file back to its previous state. If not, a special - * file with the same name as the part file and the suffix {@code ".valid-length"} will be created that contains the - * length up to which the file contains valid data. When reading the file, it must be ensured that it is only read up - * to that point. The prefixes and suffixes for the different file states and valid-length files can be configured - * using the adequate setter method, e.g. {@link #setPendingSuffix(String)}. - * - * <p> - * <b>NOTE:</b> - * <ol> - * <li> - * If checkpointing is not enabled the pending files will never be moved to the finished state. In that case, - * the pending suffix/prefix can be set to {@code ""} to make the sink work in a non-fault-tolerant way but - * still provide output without prefixes and suffixes. - * </li> - * <li> - * The part files are written using an instance of {@link Writer}. By default, a - * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result - * of {@code toString()} for every element, separated by newlines. You can configure the writer using the - * {@link #setWriter(Writer)}. For example, {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} - * can be used to write Hadoop {@code SequenceFiles}. - * </li> - * </ol> - * - * <p> - * Example: - * <pre>{@code - * new BucketingSink<Tuple2<IntWritable, Text>>(outPath) - * .setWriter(new SequenceFileWriter<IntWritable, Text>()) - * .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm") - * }</pre> - * - * This will create a sink that writes to {@code SequenceFiles} and rolls every minute. - * - * @see DateTimeBucketer - * @see StringWriter - * @see SequenceFileWriter - * - * @param <T> Type of the elements emitted by this sink - */ -public class BucketingSink<T> - extends RichSinkFunction<T> - implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener, ProcessingTimeCallback { - - private static final long serialVersionUID = 1L; - - private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class); - - // -------------------------------------------------------------------------------------------- - // User configuration values - // -------------------------------------------------------------------------------------------- - // These are initialized with some defaults but are meant to be changeable by the user - - /** - * The default maximum size of part files (currently {@code 384 MB}). - */ - private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L; - - /** - * The default time between checks for inactive buckets. By default, {60 sec}. - */ - private final long DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS = 60 * 1000L; - - /** - * The default threshold (in {@code ms}) for marking a bucket as inactive and - * closing its part files. By default, {60 sec}. - */ - private final long DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS = 60 * 1000L; - - /** - * The suffix for {@code in-progress} part files. These are files we are - * currently writing to, but which were not yet confirmed by a checkpoint. - */ - private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress"; - - /** - * The prefix for {@code in-progress} part files. These are files we are - * currently writing to, but which were not yet confirmed by a checkpoint. - */ - private final String DEFAULT_IN_PROGRESS_PREFIX = "_"; - - /** - * The suffix for {@code pending} part files. These are closed files that we are - * not currently writing to (inactive or reached {@link #batchSize}), but which - * were not yet confirmed by a checkpoint. - */ - private final String DEFAULT_PENDING_SUFFIX = ".pending"; - - /** - * The prefix for {@code pending} part files. These are closed files that we are - * not currently writing to (inactive or reached {@link #batchSize}), but which - * were not yet confirmed by a checkpoint. - */ - private final String DEFAULT_PENDING_PREFIX = "_"; - - /** - * When {@code truncate()} is not supported by the used {@link FileSystem}, we create - * a file along the part file with this suffix that contains the length up to which - * the part file is valid. - */ - private final String DEFAULT_VALID_SUFFIX = ".valid-length"; - - /** - * When {@code truncate()} is not supported by the used {@link FileSystem}, we create - * a file along the part file with this preffix that contains the length up to which - * the part file is valid. - */ - private final String DEFAULT_VALID_PREFIX = "_"; - - /** - * The default prefix for part files. - */ - private final String DEFAULT_PART_REFIX = "part"; - - /** - * The default timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}). - */ - private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000; - - - /** - * The base {@code Path} that stores all bucket directories. - */ - private final String basePath; - - /** - * The {@code Bucketer} that is used to determine the path of bucket directories. - */ - private Bucketer<T> bucketer; - - /** - * We have a template and call duplicate() for each parallel writer in open() to get the actual - * writer that is used for the part files. - */ - private Writer<T> writerTemplate; - - private long batchSize = DEFAULT_BATCH_SIZE; - private long inactiveBucketCheckInterval = DEFAULT_INACTIVE_BUCKET_CHECK_INTERVAL_MS; - private long inactiveBucketThreshold = DEFAULT_INACTIVE_BUCKET_THRESHOLD_MS; - - // These are the actually configured prefixes/suffixes - private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX; - private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX; - - private String pendingSuffix = DEFAULT_PENDING_SUFFIX; - private String pendingPrefix = DEFAULT_PENDING_PREFIX; - - private String validLengthSuffix = DEFAULT_VALID_SUFFIX; - private String validLengthPrefix= DEFAULT_VALID_PREFIX; - - private String partPrefix = DEFAULT_PART_REFIX; - - /** - * The timeout for asynchronous operations such as recoverLease and truncate (in {@code ms}). - */ - private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS; - - // -------------------------------------------------------------------------------------------- - // Internal fields (not configurable by user) - // -------------------------------------------§------------------------------------------------- - - /** - * We use reflection to get the .truncate() method, this is only available starting with Hadoop 2.7 - */ - private transient Method refTruncate; - - /** - * The state object that is handled by Flink from snapshot/restore. This contains state for - * every open bucket: the current in-progress part file path, its valid length and the pending part files. - */ - private transient State<T> state; - - private transient ListState<State<T>> restoredBucketStates; - - /** - * User-defined FileSystem parameters - */ - private Configuration fsConfig; - - /** - * The FileSystem reference. - */ - private transient FileSystem fs; - - private transient Clock clock; - - private transient ProcessingTimeService processingTimeService; - - /** - * Creates a new {@code BucketingSink} that writes files to the given base directory. - * - * <p> - * This uses a{@link DateTimeBucketer} as {@link Bucketer} and a {@link StringWriter} has writer. - * The maximum bucket size is set to 384 MB. - * - * @param basePath The directory to which to write the bucket files. - */ - public BucketingSink(String basePath) { - this.basePath = basePath; - this.bucketer = new DateTimeBucketer<>(); - this.writerTemplate = new StringWriter<>(); - } - - /** - * Specify a custom {@code Configuration} that will be used when creating - * the {@link FileSystem} for writing. - */ - public BucketingSink<T> setFSConfig(Configuration config) { - this.fsConfig = new Configuration(); - fsConfig.addAll(config); - return this; - } - - /** - * Specify a custom {@code Configuration} that will be used when creating - * the {@link FileSystem} for writing. - */ - public BucketingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) { - this.fsConfig = new Configuration(); - for(Map.Entry<String, String> entry : config) { - fsConfig.setString(entry.getKey(), entry.getValue()); - } - return this; - } - - @Override - @SuppressWarnings("unchecked") - public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { - if (this.writerTemplate instanceof InputTypeConfigurable) { - ((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig); - } - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - Preconditions.checkArgument(this.restoredBucketStates == null, "The operator has already been initialized."); - - initFileSystem(); - - if (this.refTruncate == null) { - this.refTruncate = reflectTruncate(fs); - } - - OperatorStateStore stateStore = context.getOperatorStateStore(); - restoredBucketStates = stateStore.getSerializableListState("bucket-states"); - - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - if (context.isRestored()) { - LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex); - - for (State<T> recoveredState : restoredBucketStates.get()) { - handleRestoredBucketState(recoveredState); - if (LOG.isDebugEnabled()) { - LOG.debug("{} idx {} restored {}", getClass().getSimpleName(), subtaskIndex, recoveredState); - } - } - } else { - LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex); - } - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - state = new State<>(); - - processingTimeService = - ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(); - - long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); - - processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this); - - this.clock = new Clock() { - @Override - public long currentTimeMillis() { - return processingTimeService.getCurrentProcessingTime(); - } - }; - } - - /** - * Create a file system with the user-defined {@code HDFS} configuration. - * @throws IOException - */ - private void initFileSystem() throws IOException { - if (fs != null) { - return; - } - org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - if (fsConfig != null) { - String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme()); - hadoopConf.setBoolean(disableCacheName, true); - for (String key : fsConfig.keySet()) { - hadoopConf.set(key, fsConfig.getString(key, null)); - } - } - - fs = new Path(basePath).getFileSystem(hadoopConf); - } - - @Override - public void close() throws Exception { - for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) { - closeCurrentPartFile(entry.getValue()); - } - } - - @Override - public void invoke(T value) throws Exception { - Path bucketPath = bucketer.getBucketPath(clock, new Path(basePath), value); - - long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); - - BucketState<T> bucketState = state.getBucketState(bucketPath); - if (bucketState == null) { - bucketState = new BucketState<>(currentProcessingTime); - state.addBucketState(bucketPath, bucketState); - } - - if (shouldRoll(bucketState)) { - openNewPartFile(bucketPath, bucketState); - } - - bucketState.writer.write(value); - bucketState.lastWrittenToTime = currentProcessingTime; - } - - /** - * Returns {@code true} if the current {@code part-file} should be closed and a new should be created. - * This happens if: - * <ol> - * <li>no file is created yet for the task to write to, or</li> - * <li>the current file has reached the maximum bucket size.</li> - * </ol> - */ - private boolean shouldRoll(BucketState<T> bucketState) throws IOException { - boolean shouldRoll = false; - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - if (!bucketState.isWriterOpen) { - shouldRoll = true; - LOG.debug("BucketingSink {} starting new bucket.", subtaskIndex); - } else { - long writePosition = bucketState.writer.getPos(); - if (writePosition > batchSize) { - shouldRoll = true; - LOG.debug( - "BucketingSink {} starting new bucket because file position {} is above batch size {}.", - subtaskIndex, - writePosition, - batchSize); - } - } - return shouldRoll; - } - - @Override - public void onProcessingTime(long timestamp) throws Exception { - long currentProcessingTime = processingTimeService.getCurrentProcessingTime(); - - checkForInactiveBuckets(currentProcessingTime); - - processingTimeService.registerTimer(currentProcessingTime + inactiveBucketCheckInterval, this); - } - - /** - * Checks for inactive buckets, and closes them. Buckets are considered inactive if they have not been - * written to for a period greater than {@code inactiveBucketThreshold} ms. This enables in-progress - * files to be moved to the pending state and be finalised on the next checkpoint. - */ - private void checkForInactiveBuckets(long currentProcessingTime) throws Exception { - - synchronized (state.bucketStates) { - for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) { - if (entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold) { - LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.", - getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold); - closeCurrentPartFile(entry.getValue()); - } - } - } - } - - /** - * Closes the current part file and opens a new one with a new bucket path, as returned by the - * {@link Bucketer}. If the bucket is not new, then this will create a new file with the same path - * as its predecessor, but with an increased rolling counter (see {@link BucketingSink}. - */ - private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws Exception { - closeCurrentPartFile(bucketState); - - if (!fs.exists(bucketPath)) { - try { - if (fs.mkdirs(bucketPath)) { - LOG.debug("Created new bucket directory: {}", bucketPath); - } - } catch (IOException e) { - throw new RuntimeException("Could not create new bucket path.", e); - } - } - - // The following loop tries different partCounter values in ascending order until it reaches the minimum - // that is not yet used. This works since there is only one parallel subtask that tries names with this - // subtask id. Otherwise we would run into concurrency issues here. This is aligned with the way we now - // clean the base directory in case of rescaling. - - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter); - while (fs.exists(partPath) || - fs.exists(getPendingPathFor(partPath)) || - fs.exists(getInProgressPathFor(partPath))) { - bucketState.partCounter++; - partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter); - } - - // increase, so we don't have to check for this name next time - bucketState.partCounter++; - - LOG.debug("Next part path is {}", partPath.toString()); - bucketState.currentFile = partPath.toString(); - - Path inProgressPath = getInProgressPathFor(partPath); - if (bucketState.writer == null) { - bucketState.writer = writerTemplate.duplicate(); - } - - bucketState.writer.open(fs, inProgressPath); - bucketState.isWriterOpen = true; - } - - /** - * Closes the current part file and moves it from the in-progress state to the pending state. - */ - private void closeCurrentPartFile(BucketState<T> bucketState) throws Exception { - if (bucketState.isWriterOpen) { - bucketState.writer.close(); - bucketState.isWriterOpen = false; - } - - if (bucketState.currentFile != null) { - Path currentPartPath = new Path(bucketState.currentFile); - Path inProgressPath = getInProgressPathFor(currentPartPath); - Path pendingPath = getPendingPathFor(currentPartPath); - - fs.rename(inProgressPath, pendingPath); - LOG.debug("Moving in-progress bucket {} to pending file {}", - inProgressPath, - pendingPath); - bucketState.pendingFiles.add(currentPartPath.toString()); - bucketState.currentFile = null; - } - } - - /** - * Gets the truncate() call using reflection. - * <p> - * <b>NOTE:</b> This code comes from Flume. - */ - private Method reflectTruncate(FileSystem fs) { - Method m = null; - if(fs != null) { - Class<?> fsClass = fs.getClass(); - try { - m = fsClass.getMethod("truncate", Path.class, long.class); - } catch (NoSuchMethodException ex) { - LOG.debug("Truncate not found. Will write a file with suffix '{}' " + - " and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix); - return null; - } - - // verify that truncate actually works - FSDataOutputStream outputStream; - Path testPath = new Path(UUID.randomUUID().toString()); - try { - outputStream = fs.create(testPath); - outputStream.writeUTF("hello"); - outputStream.close(); - } catch (IOException e) { - LOG.error("Could not create file for checking if truncate works.", e); - throw new RuntimeException("Could not create file for checking if truncate works.", e); - } - - try { - m.invoke(fs, testPath, 2); - } catch (IllegalAccessException | InvocationTargetException e) { - LOG.debug("Truncate is not supported.", e); - m = null; - } - - try { - fs.delete(testPath, false); - } catch (IOException e) { - LOG.error("Could not delete truncate test file.", e); - throw new RuntimeException("Could not delete truncate test file.", e); - } - } - return m; - } - - private Path getPendingPathFor(Path path) { - return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix); - } - - private Path getInProgressPathFor(Path path) { - return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix); - } - - private Path getValidLengthPathFor(Path path) { - return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix); - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - synchronized (state.bucketStates) { - - Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = state.bucketStates.entrySet().iterator(); - while (bucketStatesIt.hasNext()) { - BucketState<T> bucketState = bucketStatesIt.next().getValue(); - synchronized (bucketState.pendingFilesPerCheckpoint) { - - Iterator<Map.Entry<Long, List<String>>> pendingCheckpointsIt = - bucketState.pendingFilesPerCheckpoint.entrySet().iterator(); - - while (pendingCheckpointsIt.hasNext()) { - - Map.Entry<Long, List<String>> entry = pendingCheckpointsIt.next(); - Long pastCheckpointId = entry.getKey(); - List<String> pendingPaths = entry.getValue(); - - if (pastCheckpointId <= checkpointId) { - LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId); - - for (String filename : pendingPaths) { - Path finalPath = new Path(filename); - Path pendingPath = getPendingPathFor(finalPath); - - fs.rename(pendingPath, finalPath); - LOG.debug( - "Moving pending file {} to final location having completed checkpoint {}.", - pendingPath, - pastCheckpointId); - } - pendingCheckpointsIt.remove(); - } - } - - if (!bucketState.isWriterOpen && - bucketState.pendingFiles.isEmpty() && - bucketState.pendingFilesPerCheckpoint.isEmpty()) { - - // We've dealt with all the pending files and the writer for this bucket is not currently open. - // Therefore this bucket is currently inactive and we can remove it from our state. - bucketStatesIt.remove(); - } - } - } - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized."); - - restoredBucketStates.clear(); - - synchronized (state.bucketStates) { - int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); - - for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) { - BucketState<T> bucketState = bucketStateEntry.getValue(); - - if (bucketState.isWriterOpen) { - bucketState.currentFileValidLength = bucketState.writer.flush(); - } - - synchronized (bucketState.pendingFilesPerCheckpoint) { - bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles); - } - bucketState.pendingFiles = new ArrayList<>(); - } - restoredBucketStates.add(state); - - if (LOG.isDebugEnabled()) { - LOG.debug("{} idx {} checkpointed {}.", getClass().getSimpleName(), subtaskIdx, state); - } - } - } - - private void handleRestoredBucketState(State<T> restoredState) { - Preconditions.checkNotNull(restoredState); - - for (BucketState<T> bucketState : restoredState.bucketStates.values()) { - - // we can clean all the pending files since they were renamed to - // final files after this checkpoint was successful - // (we re-start from the last **successful** checkpoint) - bucketState.pendingFiles.clear(); - - if (bucketState.currentFile != null) { - - // We were writing to a file when the last checkpoint occurred. This file can either - // be still in-progress or became a pending file at some point after the checkpoint. - // Either way, we have to truncate it back to a valid state (or write a .valid-length - // file that specifies up to which length it is valid) and rename it to the final name - // before starting a new bucket file. - - Path partPath = new Path(bucketState.currentFile); - try { - Path partPendingPath = getPendingPathFor(partPath); - Path partInProgressPath = getInProgressPathFor(partPath); - - if (fs.exists(partPendingPath)) { - LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath); - // has been moved to pending in the mean time, rename to final location - fs.rename(partPendingPath, partPath); - } else if (fs.exists(partInProgressPath)) { - LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath); - // it was still in progress, rename to final path - fs.rename(partInProgressPath, partPath); - } else if (fs.exists(partPath)) { - LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath); - } else { - LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " + - "it was moved to final location by a previous snapshot restore", bucketState.currentFile); - } - - // We use reflection to get the .truncate() method, this - // is only available starting with Hadoop 2.7 - if (this.refTruncate == null) { - this.refTruncate = reflectTruncate(fs); - } - - // truncate it or write a ".valid-length" file to specify up to which point it is valid - if (refTruncate != null) { - LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength); - // some-one else might still hold the lease from a previous try, we are - // recovering, after all ... - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem) fs; - LOG.debug("Trying to recover file lease {}", partPath); - dfs.recoverLease(partPath); - boolean isclosed = dfs.isFileClosed(partPath); - StopWatch sw = new StopWatch(); - sw.start(); - while (!isclosed) { - if (sw.getTime() > asyncTimeout) { - break; - } - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - // ignore it - } - isclosed = dfs.isFileClosed(partPath); - } - } - Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength); - if (!truncated) { - LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath); - - // we must wait for the asynchronous truncate operation to complete - StopWatch sw = new StopWatch(); - sw.start(); - long newLen = fs.getFileStatus(partPath).getLen(); - while (newLen != bucketState.currentFileValidLength) { - if (sw.getTime() > asyncTimeout) { - break; - } - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - // ignore it - } - newLen = fs.getFileStatus(partPath).getLen(); - } - if (newLen != bucketState.currentFileValidLength) { - throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + "."); - } - } - } else { - LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength); - Path validLengthFilePath = getValidLengthPathFor(partPath); - if (!fs.exists(validLengthFilePath)) { - FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath); - lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength)); - lengthFileOut.close(); - } - } - - // Now that we've restored the bucket to a valid state, reset the current file info - bucketState.currentFile = null; - bucketState.currentFileValidLength = -1; - bucketState.isWriterOpen = false; - } catch (IOException e) { - LOG.error("Error while restoring BucketingSink state.", e); - throw new RuntimeException("Error while restoring BucketingSink state.", e); - } catch (InvocationTargetException | IllegalAccessException e) { - LOG.error("Could not invoke truncate.", e); - throw new RuntimeException("Could not invoke truncate.", e); - } - } - - // Move files that are confirmed by a checkpoint but did not get moved to final location - // because the checkpoint notification did not happen before a failure - - LOG.debug("Moving pending files to final location on restore."); - - Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); - for (Long pastCheckpointId : pastCheckpointIds) { - // All the pending files are buckets that have been completed but are waiting to be renamed - // to their final name - for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) { - Path finalPath = new Path(filename); - Path pendingPath = getPendingPathFor(finalPath); - - try { - if (fs.exists(pendingPath)) { - LOG.debug("Restoring BucketingSink State: Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId); - fs.rename(pendingPath, finalPath); - } - } catch (IOException e) { - LOG.error("Restoring BucketingSink State: Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e); - throw new RuntimeException("Error while renaming pending file " + pendingPath + " to final path " + finalPath, e); - } - } - } - - synchronized (bucketState.pendingFilesPerCheckpoint) { - bucketState.pendingFilesPerCheckpoint.clear(); - } - } - } - - // -------------------------------------------------------------------------------------------- - // Setters for User configuration values - // -------------------------------------------------------------------------------------------- - - /** - * Sets the maximum bucket size in bytes. - * - * <p> - * When a bucket part file becomes larger than this size a new bucket part file is started and - * the old one is closed. The name of the bucket files depends on the {@link Bucketer}. - * - * @param batchSize The bucket part file size in bytes. - */ - public BucketingSink<T> setBatchSize(long batchSize) { - this.batchSize = batchSize; - return this; - } - - /** - * Sets the default time between checks for inactive buckets. - * - * @param interval The timeout, in milliseconds. - */ - public BucketingSink<T> setInactiveBucketCheckInterval(long interval) { - this.inactiveBucketCheckInterval = interval; - return this; - } - - /** - * Sets the default threshold for marking a bucket as inactive and closing its part files. - * Buckets which haven't been written to for at least this period of time become inactive. - * - * @param threshold The timeout, in milliseconds. - */ - public BucketingSink<T> setInactiveBucketThreshold(long threshold) { - this.inactiveBucketThreshold = threshold; - return this; - } - - /** - * Sets the {@link Bucketer} to use for determining the bucket files to write to. - * - * @param bucketer The bucketer to use. - */ - public BucketingSink<T> setBucketer(Bucketer<T> bucketer) { - this.bucketer = bucketer; - return this; - } - - /** - * Sets the {@link Writer} to be used for writing the incoming elements to bucket files. - * - * @param writer The {@code Writer} to use. - */ - public BucketingSink<T> setWriter(Writer<T> writer) { - this.writerTemplate = writer; - return this; - } - - /** - * Sets the suffix of in-progress part files. The default is {@code "in-progress"}. - */ - public BucketingSink<T> setInProgressSuffix(String inProgressSuffix) { - this.inProgressSuffix = inProgressSuffix; - return this; - } - - /** - * Sets the prefix of in-progress part files. The default is {@code "_"}. - */ - public BucketingSink<T> setInProgressPrefix(String inProgressPrefix) { - this.inProgressPrefix = inProgressPrefix; - return this; - } - - /** - * Sets the suffix of pending part files. The default is {@code ".pending"}. - */ - public BucketingSink<T> setPendingSuffix(String pendingSuffix) { - this.pendingSuffix = pendingSuffix; - return this; - } - - /** - * Sets the prefix of pending part files. The default is {@code "_"}. - */ - public BucketingSink<T> setPendingPrefix(String pendingPrefix) { - this.pendingPrefix = pendingPrefix; - return this; - } - - /** - * Sets the suffix of valid-length files. The default is {@code ".valid-length"}. - */ - public BucketingSink<T> setValidLengthSuffix(String validLengthSuffix) { - this.validLengthSuffix = validLengthSuffix; - return this; - } - - /** - * Sets the prefix of valid-length files. The default is {@code "_"}. - */ - public BucketingSink<T> setValidLengthPrefix(String validLengthPrefix) { - this.validLengthPrefix = validLengthPrefix; - return this; - } - - /** - * Sets the prefix of part files. The default is {@code "part"}. - */ - public BucketingSink<T> setPartPrefix(String partPrefix) { - this.partPrefix = partPrefix; - return this; - } - - /** - * Disable cleanup of leftover in-progress/pending files when the sink is opened. - * - * <p> - * This should only be disabled if using the sink without checkpoints, to not remove - * the files already in the directory. - * - * @deprecated This option is deprecated and remains only for backwards compatibility. - * We do not clean up lingering files anymore. - */ - @Deprecated - public BucketingSink<T> disableCleanupOnOpen() { - return this; - } - - /** - * Sets the default timeout for asynchronous operations such as recoverLease and truncate. - * - * @param timeout The timeout, in milliseconds. - */ - public BucketingSink<T> setAsyncTimeout(long timeout) { - this.asyncTimeout = timeout; - return this; - } - - // -------------------------------------------------------------------------------------------- - // Internal Classes - // -------------------------------------------------------------------------------------------- - - /** - * This is used during snapshot/restore to keep track of in-progress buckets. - * For each bucket, we maintain a state. - */ - static final class State<T> implements Serializable { - private static final long serialVersionUID = 1L; - - /** - * For every bucket directory (key), we maintain a bucket state (value). - */ - final Map<String, BucketState<T>> bucketStates = new HashMap<>(); - - void addBucketState(Path bucketPath, BucketState<T> state) { - synchronized (bucketStates) { - bucketStates.put(bucketPath.toString(), state); - } - } - - BucketState<T> getBucketState(Path bucketPath) { - synchronized (bucketStates) { - return bucketStates.get(bucketPath.toString()); - } - } - - @Override - public String toString() { - return bucketStates.toString(); - } - } - - /** - * This is used for keeping track of the current in-progress buckets and files that we mark - * for moving from pending to final location after we get a checkpoint-complete notification. - */ - static final class BucketState<T> implements Serializable { - private static final long serialVersionUID = 1L; - - /** - * The file that was in-progress when the last checkpoint occurred. - */ - String currentFile; - - /** - * The valid length of the in-progress file at the time of the last checkpoint. - */ - long currentFileValidLength = -1; - - /** - * The time this bucket was last written to. - */ - long lastWrittenToTime; - - /** - * Pending files that accumulated since the last checkpoint. - */ - List<String> pendingFiles = new ArrayList<>(); - - /** - * When doing a checkpoint we move the pending files since the last checkpoint to this map - * with the id of the checkpoint. When we get the checkpoint-complete notification we move - * pending files of completed checkpoints to their final location. - */ - final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>(); - - /** - * For counting the part files inside a bucket directory. Part files follow the pattern - * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter. - */ - private transient int partCounter; - - /** - * Tracks if the writer is currently opened or closed. - */ - private transient boolean isWriterOpen; - - /** - * The actual writer that we user for writing the part files. - */ - private transient Writer<T> writer; - - @Override - public String toString() { - return - "In-progress=" + currentFile + - " validLength=" + currentFileValidLength + - " pendingForNextCheckpoint=" + pendingFiles + - " pendingForPrevCheckpoints=" + pendingFilesPerCheckpoint + - " lastModified@" + lastWrittenToTime; - } - - BucketState(long lastWrittenToTime) { - this.lastWrittenToTime = lastWrittenToTime; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java deleted file mode 100644 index b985e14..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/DateTimeBucketer.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs.bucketing; - -import org.apache.flink.streaming.connectors.fs.Clock; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.text.SimpleDateFormat; -import java.util.Date; - -/** - * A {@link Bucketer} that assigns to buckets based on current system time. - * - * <p> - * The {@code DateTimeBucketer} will create directories of the following form: - * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path - * that was specified as a base path when creating the - * {@link BucketingSink}. The {@code dateTimePath} - * is determined based on the current system time and the user provided format string. - * - * <p> - * {@link SimpleDateFormat} is used to derive a date string from the current system time and - * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling - * files will have a granularity of hours. - * - * - * <p> - * Example: - * - * <pre>{@code - * Bucketer buck = new DateTimeBucketer("yyyy-MM-dd--HH"); - * }</pre> - * - * This will create for example the following bucket path: - * {@code /base/1976-12-31-14/} - * - */ -public class DateTimeBucketer<T> implements Bucketer<T> { - - private static final long serialVersionUID = 1L; - - private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH"; - - private final String formatString; - - private transient SimpleDateFormat dateFormatter; - - /** - * Creates a new {@code DateTimeBucketer} with format string {@code "yyyy-MM-dd--HH"}. - */ - public DateTimeBucketer() { - this(DEFAULT_FORMAT_STRING); - } - - /** - * Creates a new {@code DateTimeBucketer} with the given date/time format string. - * - * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine - * the bucket path. - */ - public DateTimeBucketer(String formatString) { - this.formatString = formatString; - - this.dateFormatter = new SimpleDateFormat(formatString); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - - this.dateFormatter = new SimpleDateFormat(formatString); - } - - @Override - public Path getBucketPath(Clock clock, Path basePath, T element) { - String newDateTimeString = dateFormatter.format(new Date(clock.currentTimeMillis())); - return new Path(basePath + "/" + newDateTimeString); - } - - @Override - public String toString() { - return "DateTimeBucketer{" + - "formatString='" + formatString + '\'' + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties b/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties deleted file mode 100644 index fe60d94..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/resources/log4j.properties +++ /dev/null @@ -1,27 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -log4j.rootLogger=OFF, testlogger - -log4j.appender.testlogger=org.apache.log4j.ConsoleAppender -log4j.appender.testlogger.target = System.err -log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout -log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n - -# suppress the irrelevant (wrong) warnings from the netty channel handler -log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java deleted file mode 100644 index 36c0d03..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkFaultToleranceITCase.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - -import com.google.common.collect.Sets; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase; -import org.apache.flink.util.NetUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.rules.TemporaryFolder; - -import java.io.BufferedReader; -import java.io.ByteArrayInputStream; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.junit.Assert.assertTrue; - -/** - * Tests for {@link org.apache.flink.streaming.connectors.fs.RollingSink}. - * - * <p> - * This test only verifies the exactly once behaviour of the sink. Another test tests the - * rolling behaviour. - * - * @deprecated should be removed with the {@link RollingSink}. - */ -@Deprecated -public class RollingSinkFaultToleranceITCase extends StreamFaultToleranceTestBase { - - final long NUM_STRINGS = 16_000; - - @ClassRule - public static TemporaryFolder tempFolder = new TemporaryFolder(); - - private static MiniDFSCluster hdfsCluster; - private static org.apache.hadoop.fs.FileSystem dfs; - - private static String outPath; - - private static final String PENDING_SUFFIX = ".pending"; - private static final String IN_PROGRESS_SUFFIX = ".in-progress"; - - @BeforeClass - public static void createHDFS() throws IOException { - Configuration conf = new Configuration(); - - File dataDir = tempFolder.newFolder(); - - conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dataDir.getAbsolutePath()); - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); - hdfsCluster = builder.build(); - - dfs = hdfsCluster.getFileSystem(); - - outPath = "hdfs://" - + NetUtils.hostAndPortToUrlString(hdfsCluster.getURI().getHost(), hdfsCluster.getNameNodePort()) - + "/string-non-rolling-out"; - } - - @AfterClass - public static void destroyHDFS() { - if (hdfsCluster != null) { - hdfsCluster.shutdown(); - } - } - - @Override - public void testProgram(StreamExecutionEnvironment env) { - assertTrue("Broken test setup", NUM_STRINGS % 40 == 0); - - int PARALLELISM = 12; - - env.enableCheckpointing(20); - env.setParallelism(PARALLELISM); - env.disableOperatorChaining(); - - DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS)).startNewChain(); - - DataStream<String> mapped = stream - .map(new OnceFailingIdentityMapper(NUM_STRINGS)); - - RollingSink<String> sink = new RollingSink<String>(outPath) - .setBucketer(new NonRollingBucketer()) - .setBatchSize(10000) - .setValidLengthPrefix("") - .setPendingPrefix("") - .setPendingSuffix(PENDING_SUFFIX) - .setInProgressSuffix(IN_PROGRESS_SUFFIX); - - mapped.addSink(sink); - - } - - @Override - public void postSubmit() throws Exception { - // We read the files and verify that we have read all the strings. If a valid-length - // file exists we only read the file to that point. (This test should work with - // FileSystems that support truncate() and with others as well.) - - Pattern messageRegex = Pattern.compile("message (\\d*)"); - - // Keep a set of the message IDs that we read. The size must equal the read count and - // the NUM_STRINGS. If numRead is bigger than the size of the set we have seen some - // elements twice. - Set<Integer> readNumbers = Sets.newHashSet(); - - HashSet<String> uniqMessagesRead = new HashSet<>(); - HashSet<String> messagesInCommittedFiles = new HashSet<>(); - - RemoteIterator<LocatedFileStatus> files = dfs.listFiles(new Path( - outPath), true); - - while (files.hasNext()) { - LocatedFileStatus file = files.next(); - - if (!file.getPath().toString().endsWith(".valid-length")) { - int validLength = (int) file.getLen(); - if (dfs.exists(file.getPath().suffix(".valid-length"))) { - FSDataInputStream inStream = dfs.open(file.getPath().suffix(".valid-length")); - String validLengthString = inStream.readUTF(); - validLength = Integer.parseInt(validLengthString); - System.out.println("VALID LENGTH: " + validLength); - } - FSDataInputStream inStream = dfs.open(file.getPath()); - byte[] buffer = new byte[validLength]; - inStream.readFully(0, buffer, 0, validLength); - inStream.close(); - - ByteArrayInputStream bais = new ByteArrayInputStream(buffer); - - InputStreamReader inStreamReader = new InputStreamReader(bais); - BufferedReader br = new BufferedReader(inStreamReader); - - String line = br.readLine(); - while (line != null) { - Matcher matcher = messageRegex.matcher(line); - if (matcher.matches()) { - uniqMessagesRead.add(line); - - // check that in the committed files there are no duplicates - if (!file.getPath().toString().endsWith(IN_PROGRESS_SUFFIX) && !file.getPath().toString().endsWith(PENDING_SUFFIX)) { - if (!messagesInCommittedFiles.add(line)) { - Assert.fail("Duplicate entry in committed bucket."); - } - } - - int messageId = Integer.parseInt(matcher.group(1)); - readNumbers.add(messageId); - } else { - Assert.fail("Read line does not match expected pattern."); - } - line = br.readLine(); - } - br.close(); - inStreamReader.close(); - bais.close(); - } - } - - // Verify that we read all strings (at-least-once) - Assert.assertEquals(NUM_STRINGS, readNumbers.size()); - - // Verify that we don't have duplicates (boom!, exactly-once) - Assert.assertEquals(NUM_STRINGS, uniqMessagesRead.size()); - } - - private static class OnceFailingIdentityMapper extends RichMapFunction<String, String> { - private static final long serialVersionUID = 1L; - - private static volatile boolean hasFailed = false; - - private final long numElements; - - private long failurePos; - private long count; - - - OnceFailingIdentityMapper(long numElements) { - this.numElements = numElements; - } - - @Override - public void open(org.apache.flink.configuration.Configuration parameters) throws IOException { - long failurePosMin = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - long failurePosMax = (long) (0.9 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - - failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; - count = 0; - } - - @Override - public String map(String value) throws Exception { - count++; - if (!hasFailed && count >= failurePos) { - hasFailed = true; - throw new Exception("Test Failure"); - } - - return value; - } - } - - private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> - implements CheckpointedAsynchronously<Integer> { - - private static final long serialVersionUID = 1L; - - private final long numElements; - - private int index; - - private volatile boolean isRunning = true; - - - StringGeneratingSourceFunction(long numElements) { - this.numElements = numElements; - } - - @Override - public void run(SourceContext<String> ctx) throws Exception { - final Object lockingObject = ctx.getCheckpointLock(); - - final int step = getRuntimeContext().getNumberOfParallelSubtasks(); - - if (index == 0) { - index = getRuntimeContext().getIndexOfThisSubtask(); - } - - while (isRunning && index < numElements) { - - Thread.sleep(1); - synchronized (lockingObject) { - ctx.collect("message " + index); - index += step; - } - } - } - - @Override - public void cancel() { - isRunning = false; - } - - private static String randomString(StringBuilder bld, Random rnd) { - final int len = rnd.nextInt(10) + 5; - - for (int i = 0; i < len; i++) { - char next = (char) (rnd.nextInt(20000) + 33); - bld.append(next); - } - - return bld.toString(); - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; - } - - @Override - public void restoreState(Integer state) { - index = state; - } - } -}
