http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java new file mode 100644 index 0000000..fc4a35e --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java @@ -0,0 +1,916 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs; + +import org.apache.commons.lang3.time.StopWatch; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; +import org.apache.flink.util.Preconditions; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +/** + * Sink that emits its input elements to rolling {@link org.apache.hadoop.fs.FileSystem} files. This + * is integrated with the checkpointing mechanism to provide exactly once semantics. + * + * <p> + * When creating the sink a {@code basePath} must be specified. The base directory contains + * one directory for every bucket. The bucket directories themselves contain several part files. + * These contain the actual written data. + * + * <p> + * The sink uses a {@link Bucketer} to determine the name of bucket directories inside the + * base directory. Whenever the {@code Bucketer} returns a different directory name than + * it returned before the sink will close the current part files inside that bucket + * and start the new bucket directory. The default bucketer is a {@link DateTimeBucketer} with + * date format string {@code ""yyyy-MM-dd--HH"}. You can specify a custom {@code Bucketer} + * using {@link #setBucketer(Bucketer)}. For example, use + * {@link NonRollingBucketer} if you don't want to have + * buckets but still write part files in a fault-tolerant way. + * + * <p> + * The filenames of the part files contain the part prefix, the parallel subtask index of the sink + * and a rolling counter, for example {@code "part-1-17"}. Per default the part prefix is + * {@code "part"} but this can be + * configured using {@link #setPartPrefix(String)}. When a part file becomes bigger + * than the batch size the current part file is closed, the part counter is increased and + * a new part file is created. The batch size defaults to {@code 384MB}, this can be configured + * using {@link #setBatchSize(long)}. + * + * <p> + * Part files can be in one of three states: in-progress, pending or finished. The reason for this + * is how the sink works together with the checkpointing mechanism to provide exactly-once semantics + * and fault-tolerance. The part file that is currently being written to is in-progress. Once + * a part file is closed for writing it becomes pending. When a checkpoint is successful the + * currently pending files will be moved to finished. If a failure occurs the pending files + * will be deleted to reset state to the last checkpoint. The data in in-progress files will + * also have to be rolled back. If the {@code FileSystem} supports the {@code truncate} call + * this will be used to reset the file back to a previous state. If not, a special file + * with the same name as the part file and the suffix {@code ".valid-length"} will be written + * that contains the length up to which the file contains valid data. When reading the file + * it must be ensured that it is only read up to that point. The prefixes and suffixes for + * the different file states and valid-length files can be configured, for example with + * {@link #setPendingSuffix(String)}. + * + * <p> + * Note: If checkpointing is not enabled the pending files will never be moved to the finished state. + * In that case, the pending suffix/prefix can be set to {@code ""} to make the sink work + * in a non-fault-tolerant way but still provide output without prefixes and suffixes. + * + * <p> + * The part files are written using an instance of {@link Writer}. By default + * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result + * of {@code toString()} for every element. Separated by newlines. You can configure the writer + * using {@link #setWriter(Writer)}. For example, + * {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} can be used to write + * Hadoop {@code SequenceFiles}. + * + * <p> + * Example: + * + * <pre>{@code + * new RollingSink<Tuple2<IntWritable, Text>>(outPath) + * .setWriter(new SequenceFileWriter<IntWritable, Text>()) + * .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm") + * }</pre> + * + * This will create a sink that writes to {@code SequenceFiles} and rolls every minute. + * + * @see DateTimeBucketer + * @see StringWriter + * @see SequenceFileWriter + * + * @param <T> Type of the elements emitted by this sink + * + * @deprecated use {@link BucketingSink} instead. + */ +@Deprecated +public class RollingSink<T> extends RichSinkFunction<T> + implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener { + + private static final long serialVersionUID = 1L; + + private static Logger LOG = LoggerFactory.getLogger(RollingSink.class); + + + // -------------------------------------------------------------------------------------------- + // User configuration values + // -------------------------------------------------------------------------------------------- + // These are initialized with some defaults but are meant to be changeable by the user + + /** + * The default maximum size of part files (currently {@code 384 MB}). + */ + private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L; + + /** + * This is used for part files that we are writing to but which where not yet confirmed + * by a checkpoint. + */ + private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress"; + + /** + * See above, but for prefix + */ + private final String DEFAULT_IN_PROGRESS_PREFIX = "_"; + + /** + * This is used for part files that we are not writing to but which are not yet confirmed by + * checkpoint. + */ + private final String DEFAULT_PENDING_SUFFIX = ".pending"; + + /** + * See above, but for prefix. + */ + private final String DEFAULT_PENDING_PREFIX = "_"; + + /** + * When truncate() is not supported on the used FileSystem we instead write a + * file along the part file with this ending that contains the length up to which + * the part file is valid. + */ + private final String DEFAULT_VALID_SUFFIX = ".valid-length"; + + /** + * See above, but for prefix. + */ + private final String DEFAULT_VALID_PREFIX = "_"; + + /** + * The default prefix for part files. + */ + private final String DEFAULT_PART_REFIX = "part"; + + /** + * The default timeout for asynchronous operations such as recoverLease and truncate. In + * milliseconds. + */ + private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000; + + + /** + * The base {@code Path} that stores all bucket directories. + */ + private final String basePath; + + /** + * The {@code Bucketer} that is used to determine the path of bucket directories. + */ + private Bucketer bucketer; + + /** + * We have a template and call duplicate() for each parallel writer in open() to get the actual + * writer that is used for the part files. + */ + private Writer<T> writerTemplate; + + /** + * The actual writer that we user for writing the part files. + */ + private Writer<T> writer; + + /** + * Maximum size of part files. If files exceed this we close and create a new one in the same + * bucket directory. + */ + private long batchSize; + + // These are the actually configured prefixes/suffixes + private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX; + private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX; + + private String pendingSuffix = DEFAULT_PENDING_SUFFIX; + private String pendingPrefix = DEFAULT_PENDING_PREFIX; + + private String validLengthSuffix = DEFAULT_VALID_SUFFIX; + private String validLengthPrefix= DEFAULT_VALID_PREFIX; + + private String partPrefix = DEFAULT_PART_REFIX; + + /** + * The timeout for asynchronous operations such as recoverLease and truncate. In + * milliseconds. + */ + private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS; + + // -------------------------------------------------------------------------------------------- + // Internal fields (not configurable by user) + // -------------------------------------------------------------------------------------------- + + + /** + * The part file that we are currently writing to. + */ + private transient Path currentPartPath; + + /** + * The bucket directory that we are currently filling. + */ + private transient Path currentBucketDirectory; + + /** + * For counting the part files inside a bucket directory. Part files follow the patter + * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter. + */ + private transient int partCounter; + + /** + * Tracks if the writer is currently opened or closed. + */ + private transient boolean isWriterOpen; + + /** + * We use reflection to get the .truncate() method, this is only available starting with + * Hadoop 2.7 + */ + private transient Method refTruncate; + + /** + * The state object that is handled by flink from snapshot/restore. In there we store the + * current part file path, the valid length of the in-progress files and pending part files. + */ + private transient BucketState bucketState; + + private transient ListState<BucketState> restoredBucketStates; + + /** + * User-defined FileSystem parameters. + */ + private Configuration fsConfig; + + /** + * The FileSystem reference. + */ + private transient FileSystem fs; + /** + * Creates a new {@code RollingSink} that writes files to the given base directory. + * + * <p> + * This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer. + * The maximum bucket size is set to 384 MB. + * + * @param basePath The directory to which to write the bucket files. + */ + public RollingSink(String basePath) { + this.basePath = basePath; + this.bucketer = new DateTimeBucketer(); + this.batchSize = DEFAULT_BATCH_SIZE; + this.writerTemplate = new StringWriter<>(); + } + + /** + * Specify a custom {@code Configuration} that will be used when creating + * the {@link FileSystem} for writing. + */ + public RollingSink<T> setFSConfig(Configuration config) { + this.fsConfig = new Configuration(); + fsConfig.addAll(config); + return this; + } + + /** + * Specify a custom {@code Configuration} that will be used when creating + * the {@link FileSystem} for writing. + */ + public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) { + this.fsConfig = new Configuration(); + for(Map.Entry<String, String> entry : config) { + fsConfig.setString(entry.getKey(), entry.getValue()); + } + return this; + } + + @Override + @SuppressWarnings("unchecked") + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + if (this.writerTemplate instanceof InputTypeConfigurable) { + ((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + Preconditions.checkArgument(this.restoredBucketStates == null, + "The " + getClass().getSimpleName() + " has already been initialized."); + + initFileSystem(); + + if (this.refTruncate == null) { + this.refTruncate = reflectTruncate(fs); + } + + OperatorStateStore stateStore = context.getOperatorStateStore(); + restoredBucketStates = stateStore.getSerializableListState("rolling-states"); + + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + if (context.isRestored()) { + LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex); + + for (BucketState bucketState : restoredBucketStates.get()) { + handleRestoredBucketState(bucketState); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("{} (taskIdx= {}) restored {}", getClass().getSimpleName(), subtaskIndex, bucketState); + } + } else { + LOG.info("No state to restore for the {} (taskIdx= {}).", getClass().getSimpleName(), subtaskIndex); + } + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + partCounter = 0; + + this.writer = writerTemplate.duplicate(); + + bucketState = new BucketState(); + } + + /** + * Create a file system with the user-defined hdfs config + * @throws IOException + */ + private void initFileSystem() throws IOException { + if (fs != null) { + return; + } + org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); + if (fsConfig != null) { + String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme()); + hadoopConf.setBoolean(disableCacheName, true); + for (String key : fsConfig.keySet()) { + hadoopConf.set(key, fsConfig.getString(key, null)); + } + } + + fs = new Path(basePath).getFileSystem(hadoopConf); + } + + @Override + public void close() throws Exception { + closeCurrentPartFile(); + } + + @Override + public void invoke(T value) throws Exception { + if (shouldRoll()) { + openNewPartFile(); + } + writer.write(value); + } + + /** + * Determines whether we should change the bucket file we are writing to. + * + * <p> + * This will roll if no file was created yet, if the file size is larger than the specified size + * or if the {@code Bucketer} determines that we should roll. + */ + private boolean shouldRoll() throws IOException { + boolean shouldRoll = false; + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + if (!isWriterOpen) { + shouldRoll = true; + LOG.debug("RollingSink {} starting new initial bucket. ", subtaskIndex); + } + if (bucketer.shouldStartNewBucket(new Path(basePath), currentBucketDirectory)) { + shouldRoll = true; + LOG.debug("RollingSink {} starting new bucket because {} said we should. ", subtaskIndex, bucketer); + // we will retrieve a new bucket base path in openNewPartFile so reset the part counter + partCounter = 0; + } + if (isWriterOpen) { + long writePosition = writer.getPos(); + if (isWriterOpen && writePosition > batchSize) { + shouldRoll = true; + LOG.debug( + "RollingSink {} starting new bucket because file position {} is above batch size {}.", + subtaskIndex, + writePosition, + batchSize); + } + } + return shouldRoll; + } + + /** + * Opens a new part file. + * + * <p> + * This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}. + */ + private void openNewPartFile() throws Exception { + closeCurrentPartFile(); + + Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath)); + + if (!newBucketDirectory.equals(currentBucketDirectory)) { + currentBucketDirectory = newBucketDirectory; + try { + if (fs.mkdirs(currentBucketDirectory)) { + LOG.debug("Created new bucket directory: {}", currentBucketDirectory); + } + } catch (IOException e) { + throw new RuntimeException("Could not create base path for new rolling file.", e); + } + } + + int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); + currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter); + + // This should work since there is only one parallel subtask that tries names with + // our subtask id. Otherwise we would run into concurrency issues here. + while (fs.exists(currentPartPath) || + fs.exists(getPendingPathFor(currentPartPath)) || + fs.exists(getInProgressPathFor(currentPartPath))) { + partCounter++; + currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter); + } + + // increase, so we don't have to check for this name next time + partCounter++; + + LOG.debug("Next part path is {}", currentPartPath.toString()); + + Path inProgressPath = getInProgressPathFor(currentPartPath); + writer.open(fs, inProgressPath); + isWriterOpen = true; + } + + private Path getPendingPathFor(Path path) { + return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix); + } + + private Path getInProgressPathFor(Path path) { + return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix); + } + + private Path getValidLengthPathFor(Path path) { + return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix); + } + + /** + * Closes the current part file. + * + * <p> + * This moves the current in-progress part file to a pending file and adds it to the list + * of pending files in our bucket state. + */ + private void closeCurrentPartFile() throws Exception { + if (isWriterOpen) { + writer.close(); + isWriterOpen = false; + } + + if (currentPartPath != null) { + Path inProgressPath = getInProgressPathFor(currentPartPath); + Path pendingPath = getPendingPathFor(currentPartPath); + fs.rename(inProgressPath, pendingPath); + LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, pendingPath); + this.bucketState.pendingFiles.add(currentPartPath.toString()); + } + } + + /** + * Gets the truncate() call using reflection. + * <p> + * <b>NOTE: </b>This code comes from Flume + */ + private Method reflectTruncate(FileSystem fs) { + Method m = null; + if (fs != null) { + Class<?> fsClass = fs.getClass(); + try { + m = fsClass.getMethod("truncate", Path.class, long.class); + } catch (NoSuchMethodException ex) { + LOG.debug("Truncate not found. Will write a file with suffix '{}' " + + " and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix); + return null; + } + + // verify that truncate actually works + FSDataOutputStream outputStream; + Path testPath = new Path(UUID.randomUUID().toString()); + try { + outputStream = fs.create(testPath); + outputStream.writeUTF("hello"); + outputStream.close(); + } catch (IOException e) { + LOG.error("Could not create file for checking if truncate works.", e); + throw new RuntimeException("Could not create file for checking if truncate works.", e); + } + + try { + m.invoke(fs, testPath, 2); + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.debug("Truncate is not supported.", e); + m = null; + } + + try { + fs.delete(testPath, false); + } catch (IOException e) { + LOG.error("Could not delete truncate test file.", e); + throw new RuntimeException("Could not delete truncate test file.", e); + } + } + return m; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + synchronized (bucketState.pendingFilesPerCheckpoint) { + Iterator<Map.Entry<Long, List<String>>> pendingCheckpointsIt = + bucketState.pendingFilesPerCheckpoint.entrySet().iterator(); + + while (pendingCheckpointsIt.hasNext()) { + Map.Entry<Long, List<String>> entry = pendingCheckpointsIt.next(); + Long pastCheckpointId = entry.getKey(); + + if (pastCheckpointId <= checkpointId) { + LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId); + // All the pending files are buckets that have been completed but are waiting to be renamed + // to their final name + for (String filename : entry.getValue()) { + Path finalPath = new Path(filename); + Path pendingPath = getPendingPathFor(finalPath); + + fs.rename(pendingPath, finalPath); + LOG.debug("Moving pending file {} to final location after complete checkpoint {}.", + pendingPath, pastCheckpointId); + } + pendingCheckpointsIt.remove(); + } + } + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + Preconditions.checkNotNull(restoredBucketStates, + "The " + getClass().getSimpleName() + " has not been properly initialized."); + + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + + if (isWriterOpen) { + bucketState.currentFile = currentPartPath.toString(); + bucketState.currentFileValidLength = writer.flush(); + } + + synchronized (bucketState.pendingFilesPerCheckpoint) { + bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles); + } + bucketState.pendingFiles = new ArrayList<>(); + + restoredBucketStates.clear(); + restoredBucketStates.add(bucketState); + + if (LOG.isDebugEnabled()) { + LOG.debug("{} (taskIdx={}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, bucketState); + } + } + + private void handleRestoredBucketState(BucketState bucketState) { + // we can clean all the pending files since they were renamed to + // final files after this checkpoint was successful + // (we re-start from the last **successful** checkpoint) + bucketState.pendingFiles.clear(); + + if (bucketState.currentFile != null) { + // We were writing to a file when the last checkpoint occurred. This file can either + // be still in-progress or became a pending file at some point after the checkpoint. + // Either way, we have to truncate it back to a valid state (or write a .valid-length + // file that specifies up to which length it is valid) and rename it to the final name + // before starting a new bucket file. + Path partPath = new Path(bucketState.currentFile); + try { + Path partPendingPath = getPendingPathFor(partPath); + Path partInProgressPath = getInProgressPathFor(partPath); + + if (fs.exists(partPendingPath)) { + LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath); + // has been moved to pending in the mean time, rename to final location + fs.rename(partPendingPath, partPath); + } else if (fs.exists(partInProgressPath)) { + LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath); + // it was still in progress, rename to final path + fs.rename(partInProgressPath, partPath); + } else if (fs.exists(partPath)) { + LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath); + } else { + LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " + + "it was moved to final location by a previous snapshot restore", bucketState.currentFile); + } + + if (this.refTruncate == null) { + this.refTruncate = reflectTruncate(fs); + } + + // truncate it or write a ".valid-length" file to specify up to which point it is valid + if (refTruncate != null) { + LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength); + // some-one else might still hold the lease from a previous try, we are + // recovering, after all ... + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem dfs = (DistributedFileSystem) fs; + LOG.debug("Trying to recover file lease {}", partPath); + dfs.recoverLease(partPath); + boolean isclosed= dfs.isFileClosed(partPath); + StopWatch sw = new StopWatch(); + sw.start(); + while(!isclosed) { + if(sw.getTime() > asyncTimeout) { + break; + } + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + // ignore it + } + isclosed = dfs.isFileClosed(partPath); + } + } + Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength); + if (!truncated) { + LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath); + + // we must wait for the asynchronous truncate operation to complete + StopWatch sw = new StopWatch(); + sw.start(); + long newLen = fs.getFileStatus(partPath).getLen(); + while(newLen != bucketState.currentFileValidLength) { + if(sw.getTime() > asyncTimeout) { + break; + } + try { + Thread.sleep(500); + } catch (InterruptedException e1) { + // ignore it + } + newLen = fs.getFileStatus(partPath).getLen(); + } + if (newLen != bucketState.currentFileValidLength) { + throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + "."); + } + } + + } else { + LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength); + Path validLengthFilePath = getValidLengthPathFor(partPath); + if (!fs.exists(validLengthFilePath)) { + FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath); + lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength)); + lengthFileOut.close(); + } + } + + // invalidate in the state object + bucketState.currentFile = null; + bucketState.currentFileValidLength = -1; + isWriterOpen = false; + } catch (IOException e) { + LOG.error("Error while restoring RollingSink state.", e); + throw new RuntimeException("Error while restoring RollingSink state.", e); + } catch (InvocationTargetException | IllegalAccessException e) { + LOG.error("Could not invoke truncate.", e); + throw new RuntimeException("Could not invoke truncate.", e); + } + } + + // Move files that are confirmed by a checkpoint but did not get moved to final location + // because the checkpoint notification did not happen before a failure + + Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); + LOG.debug("Moving pending files to final location on restore."); + for (Long pastCheckpointId : pastCheckpointIds) { + // All the pending files are buckets that have been completed but are waiting to be renamed + // to their final name + for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) { + Path finalPath = new Path(filename); + Path pendingPath = getPendingPathFor(finalPath); + + try { + if (fs.exists(pendingPath)) { + LOG.debug("(RESTORE) Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId); + fs.rename(pendingPath, finalPath); + } + } catch (IOException e) { + LOG.error("(RESTORE) Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e); + throw new RuntimeException("Error while renaming pending file " + pendingPath+ " to final path " + finalPath, e); + } + } + } + + synchronized (bucketState.pendingFilesPerCheckpoint) { + bucketState.pendingFilesPerCheckpoint.clear(); + } + } + + // -------------------------------------------------------------------------------------------- + // Setters for User configuration values + // -------------------------------------------------------------------------------------------- + + /** + * Sets the maximum bucket size in bytes. + * + * <p> + * When a bucket part file becomes larger than this size a new bucket part file is started and + * the old one is closed. The name of the bucket files depends on the {@link Bucketer}. + * + * @param batchSize The bucket part file size in bytes. + */ + public RollingSink<T> setBatchSize(long batchSize) { + this.batchSize = batchSize; + return this; + } + + /** + * Sets the {@link Bucketer} to use for determining the bucket files to write to. + * + * @param bucketer The bucketer to use. + */ + public RollingSink<T> setBucketer(Bucketer bucketer) { + this.bucketer = bucketer; + return this; + } + + /** + * Sets the {@link Writer} to be used for writing the incoming elements to bucket files. + * + * @param writer The {@code Writer} to use. + */ + public RollingSink<T> setWriter(Writer<T> writer) { + this.writerTemplate = writer; + return this; + } + + /** + * Sets the suffix of in-progress part files. The default is {@code "in-progress"}. + */ + public RollingSink<T> setInProgressSuffix(String inProgressSuffix) { + this.inProgressSuffix = inProgressSuffix; + return this; + } + + /** + * Sets the prefix of in-progress part files. The default is {@code "_"}. + */ + public RollingSink<T> setInProgressPrefix(String inProgressPrefix) { + this.inProgressPrefix = inProgressPrefix; + return this; + } + + /** + * Sets the suffix of pending part files. The default is {@code ".pending"}. + */ + public RollingSink<T> setPendingSuffix(String pendingSuffix) { + this.pendingSuffix = pendingSuffix; + return this; + } + + /** + * Sets the prefix of pending part files. The default is {@code "_"}. + */ + public RollingSink<T> setPendingPrefix(String pendingPrefix) { + this.pendingPrefix = pendingPrefix; + return this; + } + + /** + * Sets the suffix of valid-length files. The default is {@code ".valid-length"}. + */ + public RollingSink<T> setValidLengthSuffix(String validLengthSuffix) { + this.validLengthSuffix = validLengthSuffix; + return this; + } + + /** + * Sets the prefix of valid-length files. The default is {@code "_"}. + */ + public RollingSink<T> setValidLengthPrefix(String validLengthPrefix) { + this.validLengthPrefix = validLengthPrefix; + return this; + } + + /** + * Sets the prefix of part files. The default is {@code "part"}. + */ + public RollingSink<T> setPartPrefix(String partPrefix) { + this.partPrefix = partPrefix; + return this; + } + + /** + * Disable cleanup of leftover in-progress/pending files when the sink is opened. + * + * <p> + * This should only be disabled if using the sink without checkpoints, to not remove + * the files already in the directory. + * + * @deprecated This option is deprecated and remains only for backwards compatibility. + * We do not clean up lingering files anymore. + */ + @Deprecated + public RollingSink<T> disableCleanupOnOpen() { + return this; + } + + /** + * Sets the default timeout for asynchronous operations such as recoverLease and truncate. + * + * @param timeout The timeout, in milliseconds. + */ + public RollingSink<T> setAsyncTimeout(long timeout) { + this.asyncTimeout = timeout; + return this; + } + + // -------------------------------------------------------------------------------------------- + // Internal Classes + // -------------------------------------------------------------------------------------------- + + /** + * This is used for keeping track of the current in-progress files and files that we mark + * for moving from pending to final location after we get a checkpoint-complete notification. + */ + static final class BucketState implements Serializable { + private static final long serialVersionUID = 1L; + + /** + * The file that was in-progress when the last checkpoint occurred. + */ + String currentFile; + + /** + * The valid length of the in-progress file at the time of the last checkpoint. + */ + long currentFileValidLength = -1; + + /** + * Pending files that accumulated since the last checkpoint. + */ + List<String> pendingFiles = new ArrayList<>(); + + /** + * When doing a checkpoint we move the pending files since the last checkpoint to this map + * with the id of the checkpoint. When we get the checkpoint-complete notification we move + * pending files of completed checkpoints to their final location. + */ + final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>(); + + @Override + public String toString() { + return + "In-progress=" + currentFile + + " validLength=" + currentFileValidLength + + " pendingForNextCheckpoint=" + pendingFiles + + " pendingForPrevCheckpoints=" + pendingFilesPerCheckpoint; + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java new file mode 100644 index 0000000..08c0d0a --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs; + + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; +import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; + +import java.io.IOException; + +/** + * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}. + * The input to the {@link BucketingSink} must + * be a {@link org.apache.flink.api.java.tuple.Tuple2} of two Hadopo + * {@link org.apache.hadoop.io.Writable Writables}. + * + * @param <K> The type of the first tuple field. + * @param <V> The type of the second tuple field. + */ +public class SequenceFileWriter<K extends Writable, V extends Writable> extends StreamWriterBase<Tuple2<K, V>> implements InputTypeConfigurable { + private static final long serialVersionUID = 1L; + + private final String compressionCodecName; + + private SequenceFile.CompressionType compressionType; + + private transient SequenceFile.Writer writer; + + private Class<K> keyClass; + + private Class<V> valueClass; + + /** + * Creates a new {@code SequenceFileWriter} that writes sequence files without compression. + */ + public SequenceFileWriter() { + this("None", SequenceFile.CompressionType.NONE); + } + + /** + * Creates a new {@code SequenceFileWriter} that writes sequence with the given + * compression codec and compression type. + * + * @param compressionCodecName Name of a Hadoop Compression Codec. + * @param compressionType The compression type to use. + */ + public SequenceFileWriter(String compressionCodecName, + SequenceFile.CompressionType compressionType) { + this.compressionCodecName = compressionCodecName; + this.compressionType = compressionType; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + super.open(fs, path); + if (keyClass == null) { + throw new IllegalStateException("Key Class has not been initialized."); + } + if (valueClass == null) { + throw new IllegalStateException("Value Class has not been initialized."); + } + + CompressionCodec codec = null; + + Configuration conf = HadoopFileSystem.getHadoopConfiguration(); + + if (!compressionCodecName.equals("None")) { + CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); + codec = codecFactory.getCodecByName(compressionCodecName); + if (codec == null) { + throw new RuntimeException("Codec " + compressionCodecName + " not found."); + } + } + + // the non-deprecated constructor syntax is only available in recent hadoop versions... + writer = SequenceFile.createWriter(conf, + getStream(), + keyClass, + valueClass, + compressionType, + codec); + } + + @Override + public void close() throws IOException { + if (writer != null) { + writer.close(); + } + super.close(); + } + + @Override + public void write(Tuple2<K, V> element) throws IOException { + getStream(); // Throws if the stream is not open + writer.append(element.f0, element.f1); + } + + @Override + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + if (!type.isTupleType()) { + throw new IllegalArgumentException("Input TypeInformation is not a tuple type."); + } + + TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type; + + if (tupleType.getArity() != 2) { + throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type."); + } + + TypeInformation<K> keyType = tupleType.getTypeAt(0); + TypeInformation<V> valueType = tupleType.getTypeAt(1); + + this.keyClass = keyType.getTypeClass(); + this.valueClass = valueType.getTypeClass(); + } + + @Override + public Writer<Tuple2<K, V>> duplicate() { + SequenceFileWriter<K, V> result = new SequenceFileWriter<>(compressionCodecName, compressionType); + result.keyClass = keyClass; + result.valueClass = valueClass; + return result; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java new file mode 100644 index 0000000..140246f --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}. + */ +public abstract class StreamWriterBase<T> implements Writer<T> { + + private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class); + + /** + * The {@code FSDataOutputStream} for the current part file. + */ + private transient FSDataOutputStream outStream; + + /** + * We use reflection to get the hflush method or use sync as a fallback. + * The idea for this and the code comes from the Flume HDFS Sink. + */ + private transient Method refHflushOrSync; + + /** + * Returns the current output stream, if the stream is open. + */ + protected FSDataOutputStream getStream() { + if (outStream == null) { + throw new IllegalStateException("Output stream has not been opened"); + } + return outStream; + } + + /** + * If hflush is available in this version of HDFS, then this method calls + * hflush, else it calls sync. + * @param os - The stream to flush/sync + * @throws java.io.IOException + * + * <p> + * Note: This code comes from Flume + */ + protected void hflushOrSync(FSDataOutputStream os) throws IOException { + try { + // At this point the refHflushOrSync cannot be null, + // since register method would have thrown if it was. + this.refHflushOrSync.invoke(os); + } catch (InvocationTargetException e) { + String msg = "Error while trying to hflushOrSync!"; + LOG.error(msg + " " + e.getCause()); + Throwable cause = e.getCause(); + if(cause != null && cause instanceof IOException) { + throw (IOException)cause; + } + throw new RuntimeException(msg, e); + } catch (Exception e) { + String msg = "Error while trying to hflushOrSync!"; + LOG.error(msg + " " + e); + throw new RuntimeException(msg, e); + } + } + + /** + * Gets the hflush call using reflection. Fallback to sync if hflush is not available. + * + * <p> + * Note: This code comes from Flume + */ + private Method reflectHflushOrSync(FSDataOutputStream os) { + Method m = null; + if(os != null) { + Class<?> fsDataOutputStreamClass = os.getClass(); + try { + m = fsDataOutputStreamClass.getMethod("hflush"); + } catch (NoSuchMethodException ex) { + LOG.debug("HFlush not found. Will use sync() instead"); + try { + m = fsDataOutputStreamClass.getMethod("sync"); + } catch (Exception ex1) { + String msg = "Neither hflush not sync were found. That seems to be " + + "a problem!"; + LOG.error(msg); + throw new RuntimeException(msg, ex1); + } + } + } + return m; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + if (outStream != null) { + throw new IllegalStateException("Writer has already been opened"); + } + outStream = fs.create(path, false); + if (refHflushOrSync == null) { + refHflushOrSync = reflectHflushOrSync(outStream); + } + } + + @Override + public long flush() throws IOException { + if (outStream == null) { + throw new IllegalStateException("Writer is not open"); + } + hflushOrSync(outStream); + return outStream.getPos(); + } + + @Override + public long getPos() throws IOException { + if (outStream == null) { + throw new IllegalStateException("Writer is not open"); + } + return outStream.getPos(); + } + + @Override + public void close() throws IOException { + if (outStream != null) { + flush(); + outStream.close(); + outStream = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java new file mode 100644 index 0000000..6568a86 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.UnsupportedCharsetException; + +/** + * A {@link Writer} that uses {@code toString()} on the input elements and writes them to + * the output bucket file separated by newline. + * + * @param <T> The type of the elements that are being written by the sink. + */ +public class StringWriter<T> extends StreamWriterBase<T> { + private static final long serialVersionUID = 1L; + + private String charsetName; + + private transient Charset charset; + + /** + * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert + * strings to bytes. + */ + public StringWriter() { + this("UTF-8"); + } + + /** + * Creates a new {@code StringWriter} that uses the given charset to convert + * strings to bytes. + * + * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)} + */ + public StringWriter(String charsetName) { + this.charsetName = charsetName; + } + + @Override + public void open(FileSystem fs, Path path) throws IOException { + super.open(fs, path); + + try { + this.charset = Charset.forName(charsetName); + } + catch (IllegalCharsetNameException e) { + throw new IOException("The charset " + charsetName + " is not valid.", e); + } + catch (UnsupportedCharsetException e) { + throw new IOException("The charset " + charsetName + " is not supported.", e); + } + } + + @Override + public void write(T element) throws IOException { + FSDataOutputStream outputStream = getStream(); + outputStream.write(element.toString().getBytes(charset)); + outputStream.write('\n'); + } + + @Override + public Writer<T> duplicate() { + return new StringWriter<>(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java new file mode 100644 index 0000000..41663df --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs; + + +/** + * A {@link Clock} that uses {@code System.currentTimeMillis()} to determine the system time. + */ +public class SystemClock implements Clock { + @Override + public long currentTimeMillis() { + return System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java new file mode 100644 index 0000000..c3b4cb6 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs; + +import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.Serializable; + +/** + * An implementation of {@code Writer} is used in conjunction with a + * {@link BucketingSink} to perform the actual + * writing to the bucket files. + * + * @param <T> The type of the elements that are being written by the sink. + */ +public interface Writer<T> extends Serializable { + + /** + * Initializes the {@code Writer} for a newly opened bucket file. + * Any internal per-bucket initialization should be performed here. + * + * @param fs The {@link org.apache.hadoop.fs.FileSystem} containing the newly opened file. + * @param path The {@link org.apache.hadoop.fs.Path} of the newly opened file. + */ + void open(FileSystem fs, Path path) throws IOException; + + /** + * Flushes out any internally held data, and returns the offset that the file + * must be truncated to at recovery. + */ + long flush() throws IOException; + + /** + * Retrieves the current position, and thus size, of the output file. + */ + long getPos() throws IOException; + + /** + * Closes the {@code Writer}. If the writer is already closed, no action will be + * taken. The call should close all state related to the current output file, + * including the output stream opened in {@code open}. + */ + void close() throws IOException ; + + /** + * Writes one element to the bucket file. + */ + void write(T element)throws IOException; + + /** + * Duplicates the {@code Writer}. This is used to get one {@code Writer} for each + * parallel instance of the sink. + */ + Writer<T> duplicate(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java new file mode 100644 index 0000000..0bf14b3 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import org.apache.flink.streaming.connectors.fs.Clock; +import org.apache.hadoop.fs.Path; + +/** + * A {@link Bucketer} that does not perform any + * bucketing of files. All files are written to the base path. + */ +public class BasePathBucketer<T> implements Bucketer<T> { + private static final long serialVersionUID = 1L; + + @Override + public Path getBucketPath(Clock clock, Path basePath, T element) { + return basePath; + } + + @Override + public String toString() { + return "BasePathBucketer"; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java new file mode 100644 index 0000000..86aa9f3 --- /dev/null +++ b/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.fs.bucketing; + +import org.apache.flink.streaming.connectors.fs.Clock; +import org.apache.hadoop.fs.Path; + +import java.io.Serializable; + +/** + * A bucketer is used with a {@link BucketingSink} + * to put emitted elements into rolling files. + * + * <p> + * The {@code BucketingSink} can be writing to many buckets at a time, and it is responsible for managing + * a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket + * path the element should fall in. The {@code Bucketer} can, for example, determine buckets based on + * system time. + */ +public interface Bucketer<T> extends Serializable { + /** + * Returns the {@link Path} of a bucket file. + * + * @param basePath The base path containing all the buckets. + * @param element The current element being processed. + * + * @return The complete {@code Path} of the bucket which the provided element should fall in. This + * should include the {@code basePath} and also the {@code subtaskIndex} to avoid clashes with + * parallel sinks. + */ + Path getBucketPath(Clock clock, Path basePath, T element); +}
