http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java deleted file mode 100644 index fc4a35e..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java +++ /dev/null @@ -1,916 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - -import org.apache.commons.lang3.time.StopWatch; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.InputTypeConfigurable; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; -import org.apache.flink.util.Preconditions; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.Serializable; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - -/** - * Sink that emits its input elements to rolling {@link org.apache.hadoop.fs.FileSystem} files. This - * is integrated with the checkpointing mechanism to provide exactly once semantics. - * - * <p> - * When creating the sink a {@code basePath} must be specified. The base directory contains - * one directory for every bucket. The bucket directories themselves contain several part files. - * These contain the actual written data. - * - * <p> - * The sink uses a {@link Bucketer} to determine the name of bucket directories inside the - * base directory. Whenever the {@code Bucketer} returns a different directory name than - * it returned before the sink will close the current part files inside that bucket - * and start the new bucket directory. The default bucketer is a {@link DateTimeBucketer} with - * date format string {@code ""yyyy-MM-dd--HH"}. You can specify a custom {@code Bucketer} - * using {@link #setBucketer(Bucketer)}. For example, use - * {@link NonRollingBucketer} if you don't want to have - * buckets but still write part files in a fault-tolerant way. - * - * <p> - * The filenames of the part files contain the part prefix, the parallel subtask index of the sink - * and a rolling counter, for example {@code "part-1-17"}. Per default the part prefix is - * {@code "part"} but this can be - * configured using {@link #setPartPrefix(String)}. When a part file becomes bigger - * than the batch size the current part file is closed, the part counter is increased and - * a new part file is created. The batch size defaults to {@code 384MB}, this can be configured - * using {@link #setBatchSize(long)}. - * - * <p> - * Part files can be in one of three states: in-progress, pending or finished. The reason for this - * is how the sink works together with the checkpointing mechanism to provide exactly-once semantics - * and fault-tolerance. The part file that is currently being written to is in-progress. Once - * a part file is closed for writing it becomes pending. When a checkpoint is successful the - * currently pending files will be moved to finished. If a failure occurs the pending files - * will be deleted to reset state to the last checkpoint. The data in in-progress files will - * also have to be rolled back. If the {@code FileSystem} supports the {@code truncate} call - * this will be used to reset the file back to a previous state. If not, a special file - * with the same name as the part file and the suffix {@code ".valid-length"} will be written - * that contains the length up to which the file contains valid data. When reading the file - * it must be ensured that it is only read up to that point. The prefixes and suffixes for - * the different file states and valid-length files can be configured, for example with - * {@link #setPendingSuffix(String)}. - * - * <p> - * Note: If checkpointing is not enabled the pending files will never be moved to the finished state. - * In that case, the pending suffix/prefix can be set to {@code ""} to make the sink work - * in a non-fault-tolerant way but still provide output without prefixes and suffixes. - * - * <p> - * The part files are written using an instance of {@link Writer}. By default - * {@link org.apache.flink.streaming.connectors.fs.StringWriter} is used, which writes the result - * of {@code toString()} for every element. Separated by newlines. You can configure the writer - * using {@link #setWriter(Writer)}. For example, - * {@link org.apache.flink.streaming.connectors.fs.SequenceFileWriter} can be used to write - * Hadoop {@code SequenceFiles}. - * - * <p> - * Example: - * - * <pre>{@code - * new RollingSink<Tuple2<IntWritable, Text>>(outPath) - * .setWriter(new SequenceFileWriter<IntWritable, Text>()) - * .setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm") - * }</pre> - * - * This will create a sink that writes to {@code SequenceFiles} and rolls every minute. - * - * @see DateTimeBucketer - * @see StringWriter - * @see SequenceFileWriter - * - * @param <T> Type of the elements emitted by this sink - * - * @deprecated use {@link BucketingSink} instead. - */ -@Deprecated -public class RollingSink<T> extends RichSinkFunction<T> - implements InputTypeConfigurable, CheckpointedFunction, CheckpointListener { - - private static final long serialVersionUID = 1L; - - private static Logger LOG = LoggerFactory.getLogger(RollingSink.class); - - - // -------------------------------------------------------------------------------------------- - // User configuration values - // -------------------------------------------------------------------------------------------- - // These are initialized with some defaults but are meant to be changeable by the user - - /** - * The default maximum size of part files (currently {@code 384 MB}). - */ - private final long DEFAULT_BATCH_SIZE = 1024L * 1024L * 384L; - - /** - * This is used for part files that we are writing to but which where not yet confirmed - * by a checkpoint. - */ - private final String DEFAULT_IN_PROGRESS_SUFFIX = ".in-progress"; - - /** - * See above, but for prefix - */ - private final String DEFAULT_IN_PROGRESS_PREFIX = "_"; - - /** - * This is used for part files that we are not writing to but which are not yet confirmed by - * checkpoint. - */ - private final String DEFAULT_PENDING_SUFFIX = ".pending"; - - /** - * See above, but for prefix. - */ - private final String DEFAULT_PENDING_PREFIX = "_"; - - /** - * When truncate() is not supported on the used FileSystem we instead write a - * file along the part file with this ending that contains the length up to which - * the part file is valid. - */ - private final String DEFAULT_VALID_SUFFIX = ".valid-length"; - - /** - * See above, but for prefix. - */ - private final String DEFAULT_VALID_PREFIX = "_"; - - /** - * The default prefix for part files. - */ - private final String DEFAULT_PART_REFIX = "part"; - - /** - * The default timeout for asynchronous operations such as recoverLease and truncate. In - * milliseconds. - */ - private final long DEFAULT_ASYNC_TIMEOUT_MS = 60 * 1000; - - - /** - * The base {@code Path} that stores all bucket directories. - */ - private final String basePath; - - /** - * The {@code Bucketer} that is used to determine the path of bucket directories. - */ - private Bucketer bucketer; - - /** - * We have a template and call duplicate() for each parallel writer in open() to get the actual - * writer that is used for the part files. - */ - private Writer<T> writerTemplate; - - /** - * The actual writer that we user for writing the part files. - */ - private Writer<T> writer; - - /** - * Maximum size of part files. If files exceed this we close and create a new one in the same - * bucket directory. - */ - private long batchSize; - - // These are the actually configured prefixes/suffixes - private String inProgressSuffix = DEFAULT_IN_PROGRESS_SUFFIX; - private String inProgressPrefix = DEFAULT_IN_PROGRESS_PREFIX; - - private String pendingSuffix = DEFAULT_PENDING_SUFFIX; - private String pendingPrefix = DEFAULT_PENDING_PREFIX; - - private String validLengthSuffix = DEFAULT_VALID_SUFFIX; - private String validLengthPrefix= DEFAULT_VALID_PREFIX; - - private String partPrefix = DEFAULT_PART_REFIX; - - /** - * The timeout for asynchronous operations such as recoverLease and truncate. In - * milliseconds. - */ - private long asyncTimeout = DEFAULT_ASYNC_TIMEOUT_MS; - - // -------------------------------------------------------------------------------------------- - // Internal fields (not configurable by user) - // -------------------------------------------------------------------------------------------- - - - /** - * The part file that we are currently writing to. - */ - private transient Path currentPartPath; - - /** - * The bucket directory that we are currently filling. - */ - private transient Path currentBucketDirectory; - - /** - * For counting the part files inside a bucket directory. Part files follow the patter - * {@code "{part-prefix}-{subtask}-{count}"}. When creating new part files we increase the counter. - */ - private transient int partCounter; - - /** - * Tracks if the writer is currently opened or closed. - */ - private transient boolean isWriterOpen; - - /** - * We use reflection to get the .truncate() method, this is only available starting with - * Hadoop 2.7 - */ - private transient Method refTruncate; - - /** - * The state object that is handled by flink from snapshot/restore. In there we store the - * current part file path, the valid length of the in-progress files and pending part files. - */ - private transient BucketState bucketState; - - private transient ListState<BucketState> restoredBucketStates; - - /** - * User-defined FileSystem parameters. - */ - private Configuration fsConfig; - - /** - * The FileSystem reference. - */ - private transient FileSystem fs; - /** - * Creates a new {@code RollingSink} that writes files to the given base directory. - * - * <p> - * This uses a{@link DateTimeBucketer} as bucketer and a {@link StringWriter} has writer. - * The maximum bucket size is set to 384 MB. - * - * @param basePath The directory to which to write the bucket files. - */ - public RollingSink(String basePath) { - this.basePath = basePath; - this.bucketer = new DateTimeBucketer(); - this.batchSize = DEFAULT_BATCH_SIZE; - this.writerTemplate = new StringWriter<>(); - } - - /** - * Specify a custom {@code Configuration} that will be used when creating - * the {@link FileSystem} for writing. - */ - public RollingSink<T> setFSConfig(Configuration config) { - this.fsConfig = new Configuration(); - fsConfig.addAll(config); - return this; - } - - /** - * Specify a custom {@code Configuration} that will be used when creating - * the {@link FileSystem} for writing. - */ - public RollingSink<T> setFSConfig(org.apache.hadoop.conf.Configuration config) { - this.fsConfig = new Configuration(); - for(Map.Entry<String, String> entry : config) { - fsConfig.setString(entry.getKey(), entry.getValue()); - } - return this; - } - - @Override - @SuppressWarnings("unchecked") - public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { - if (this.writerTemplate instanceof InputTypeConfigurable) { - ((InputTypeConfigurable) writerTemplate).setInputType(type, executionConfig); - } - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - Preconditions.checkArgument(this.restoredBucketStates == null, - "The " + getClass().getSimpleName() + " has already been initialized."); - - initFileSystem(); - - if (this.refTruncate == null) { - this.refTruncate = reflectTruncate(fs); - } - - OperatorStateStore stateStore = context.getOperatorStateStore(); - restoredBucketStates = stateStore.getSerializableListState("rolling-states"); - - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - if (context.isRestored()) { - LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIndex); - - for (BucketState bucketState : restoredBucketStates.get()) { - handleRestoredBucketState(bucketState); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("{} (taskIdx= {}) restored {}", getClass().getSimpleName(), subtaskIndex, bucketState); - } - } else { - LOG.info("No state to restore for the {} (taskIdx= {}).", getClass().getSimpleName(), subtaskIndex); - } - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - partCounter = 0; - - this.writer = writerTemplate.duplicate(); - - bucketState = new BucketState(); - } - - /** - * Create a file system with the user-defined hdfs config - * @throws IOException - */ - private void initFileSystem() throws IOException { - if (fs != null) { - return; - } - org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration(); - if (fsConfig != null) { - String disableCacheName = String.format("fs.%s.impl.disable.cache", new Path(basePath).toUri().getScheme()); - hadoopConf.setBoolean(disableCacheName, true); - for (String key : fsConfig.keySet()) { - hadoopConf.set(key, fsConfig.getString(key, null)); - } - } - - fs = new Path(basePath).getFileSystem(hadoopConf); - } - - @Override - public void close() throws Exception { - closeCurrentPartFile(); - } - - @Override - public void invoke(T value) throws Exception { - if (shouldRoll()) { - openNewPartFile(); - } - writer.write(value); - } - - /** - * Determines whether we should change the bucket file we are writing to. - * - * <p> - * This will roll if no file was created yet, if the file size is larger than the specified size - * or if the {@code Bucketer} determines that we should roll. - */ - private boolean shouldRoll() throws IOException { - boolean shouldRoll = false; - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - if (!isWriterOpen) { - shouldRoll = true; - LOG.debug("RollingSink {} starting new initial bucket. ", subtaskIndex); - } - if (bucketer.shouldStartNewBucket(new Path(basePath), currentBucketDirectory)) { - shouldRoll = true; - LOG.debug("RollingSink {} starting new bucket because {} said we should. ", subtaskIndex, bucketer); - // we will retrieve a new bucket base path in openNewPartFile so reset the part counter - partCounter = 0; - } - if (isWriterOpen) { - long writePosition = writer.getPos(); - if (isWriterOpen && writePosition > batchSize) { - shouldRoll = true; - LOG.debug( - "RollingSink {} starting new bucket because file position {} is above batch size {}.", - subtaskIndex, - writePosition, - batchSize); - } - } - return shouldRoll; - } - - /** - * Opens a new part file. - * - * <p> - * This closes the old bucket file and retrieves a new bucket path from the {@code Bucketer}. - */ - private void openNewPartFile() throws Exception { - closeCurrentPartFile(); - - Path newBucketDirectory = bucketer.getNextBucketPath(new Path(basePath)); - - if (!newBucketDirectory.equals(currentBucketDirectory)) { - currentBucketDirectory = newBucketDirectory; - try { - if (fs.mkdirs(currentBucketDirectory)) { - LOG.debug("Created new bucket directory: {}", currentBucketDirectory); - } - } catch (IOException e) { - throw new RuntimeException("Could not create base path for new rolling file.", e); - } - } - - int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); - currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter); - - // This should work since there is only one parallel subtask that tries names with - // our subtask id. Otherwise we would run into concurrency issues here. - while (fs.exists(currentPartPath) || - fs.exists(getPendingPathFor(currentPartPath)) || - fs.exists(getInProgressPathFor(currentPartPath))) { - partCounter++; - currentPartPath = new Path(currentBucketDirectory, partPrefix + "-" + subtaskIndex + "-" + partCounter); - } - - // increase, so we don't have to check for this name next time - partCounter++; - - LOG.debug("Next part path is {}", currentPartPath.toString()); - - Path inProgressPath = getInProgressPathFor(currentPartPath); - writer.open(fs, inProgressPath); - isWriterOpen = true; - } - - private Path getPendingPathFor(Path path) { - return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix); - } - - private Path getInProgressPathFor(Path path) { - return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix); - } - - private Path getValidLengthPathFor(Path path) { - return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix); - } - - /** - * Closes the current part file. - * - * <p> - * This moves the current in-progress part file to a pending file and adds it to the list - * of pending files in our bucket state. - */ - private void closeCurrentPartFile() throws Exception { - if (isWriterOpen) { - writer.close(); - isWriterOpen = false; - } - - if (currentPartPath != null) { - Path inProgressPath = getInProgressPathFor(currentPartPath); - Path pendingPath = getPendingPathFor(currentPartPath); - fs.rename(inProgressPath, pendingPath); - LOG.debug("Moving in-progress bucket {} to pending file {}", inProgressPath, pendingPath); - this.bucketState.pendingFiles.add(currentPartPath.toString()); - } - } - - /** - * Gets the truncate() call using reflection. - * <p> - * <b>NOTE: </b>This code comes from Flume - */ - private Method reflectTruncate(FileSystem fs) { - Method m = null; - if (fs != null) { - Class<?> fsClass = fs.getClass(); - try { - m = fsClass.getMethod("truncate", Path.class, long.class); - } catch (NoSuchMethodException ex) { - LOG.debug("Truncate not found. Will write a file with suffix '{}' " + - " and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix); - return null; - } - - // verify that truncate actually works - FSDataOutputStream outputStream; - Path testPath = new Path(UUID.randomUUID().toString()); - try { - outputStream = fs.create(testPath); - outputStream.writeUTF("hello"); - outputStream.close(); - } catch (IOException e) { - LOG.error("Could not create file for checking if truncate works.", e); - throw new RuntimeException("Could not create file for checking if truncate works.", e); - } - - try { - m.invoke(fs, testPath, 2); - } catch (IllegalAccessException | InvocationTargetException e) { - LOG.debug("Truncate is not supported.", e); - m = null; - } - - try { - fs.delete(testPath, false); - } catch (IOException e) { - LOG.error("Could not delete truncate test file.", e); - throw new RuntimeException("Could not delete truncate test file.", e); - } - } - return m; - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - synchronized (bucketState.pendingFilesPerCheckpoint) { - Iterator<Map.Entry<Long, List<String>>> pendingCheckpointsIt = - bucketState.pendingFilesPerCheckpoint.entrySet().iterator(); - - while (pendingCheckpointsIt.hasNext()) { - Map.Entry<Long, List<String>> entry = pendingCheckpointsIt.next(); - Long pastCheckpointId = entry.getKey(); - - if (pastCheckpointId <= checkpointId) { - LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId); - // All the pending files are buckets that have been completed but are waiting to be renamed - // to their final name - for (String filename : entry.getValue()) { - Path finalPath = new Path(filename); - Path pendingPath = getPendingPathFor(finalPath); - - fs.rename(pendingPath, finalPath); - LOG.debug("Moving pending file {} to final location after complete checkpoint {}.", - pendingPath, pastCheckpointId); - } - pendingCheckpointsIt.remove(); - } - } - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - Preconditions.checkNotNull(restoredBucketStates, - "The " + getClass().getSimpleName() + " has not been properly initialized."); - - int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); - - if (isWriterOpen) { - bucketState.currentFile = currentPartPath.toString(); - bucketState.currentFileValidLength = writer.flush(); - } - - synchronized (bucketState.pendingFilesPerCheckpoint) { - bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles); - } - bucketState.pendingFiles = new ArrayList<>(); - - restoredBucketStates.clear(); - restoredBucketStates.add(bucketState); - - if (LOG.isDebugEnabled()) { - LOG.debug("{} (taskIdx={}) checkpointed {}.", getClass().getSimpleName(), subtaskIdx, bucketState); - } - } - - private void handleRestoredBucketState(BucketState bucketState) { - // we can clean all the pending files since they were renamed to - // final files after this checkpoint was successful - // (we re-start from the last **successful** checkpoint) - bucketState.pendingFiles.clear(); - - if (bucketState.currentFile != null) { - // We were writing to a file when the last checkpoint occurred. This file can either - // be still in-progress or became a pending file at some point after the checkpoint. - // Either way, we have to truncate it back to a valid state (or write a .valid-length - // file that specifies up to which length it is valid) and rename it to the final name - // before starting a new bucket file. - Path partPath = new Path(bucketState.currentFile); - try { - Path partPendingPath = getPendingPathFor(partPath); - Path partInProgressPath = getInProgressPathFor(partPath); - - if (fs.exists(partPendingPath)) { - LOG.debug("In-progress file {} has been moved to pending after checkpoint, moving to final location.", partPath); - // has been moved to pending in the mean time, rename to final location - fs.rename(partPendingPath, partPath); - } else if (fs.exists(partInProgressPath)) { - LOG.debug("In-progress file {} is still in-progress, moving to final location.", partPath); - // it was still in progress, rename to final path - fs.rename(partInProgressPath, partPath); - } else if (fs.exists(partPath)) { - LOG.debug("In-Progress file {} was already moved to final location {}.", bucketState.currentFile, partPath); - } else { - LOG.debug("In-Progress file {} was neither moved to pending nor is still in progress. Possibly, " + - "it was moved to final location by a previous snapshot restore", bucketState.currentFile); - } - - if (this.refTruncate == null) { - this.refTruncate = reflectTruncate(fs); - } - - // truncate it or write a ".valid-length" file to specify up to which point it is valid - if (refTruncate != null) { - LOG.debug("Truncating {} to valid length {}", partPath, bucketState.currentFileValidLength); - // some-one else might still hold the lease from a previous try, we are - // recovering, after all ... - if (fs instanceof DistributedFileSystem) { - DistributedFileSystem dfs = (DistributedFileSystem) fs; - LOG.debug("Trying to recover file lease {}", partPath); - dfs.recoverLease(partPath); - boolean isclosed= dfs.isFileClosed(partPath); - StopWatch sw = new StopWatch(); - sw.start(); - while(!isclosed) { - if(sw.getTime() > asyncTimeout) { - break; - } - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - // ignore it - } - isclosed = dfs.isFileClosed(partPath); - } - } - Boolean truncated = (Boolean) refTruncate.invoke(fs, partPath, bucketState.currentFileValidLength); - if (!truncated) { - LOG.debug("Truncate did not immediately complete for {}, waiting...", partPath); - - // we must wait for the asynchronous truncate operation to complete - StopWatch sw = new StopWatch(); - sw.start(); - long newLen = fs.getFileStatus(partPath).getLen(); - while(newLen != bucketState.currentFileValidLength) { - if(sw.getTime() > asyncTimeout) { - break; - } - try { - Thread.sleep(500); - } catch (InterruptedException e1) { - // ignore it - } - newLen = fs.getFileStatus(partPath).getLen(); - } - if (newLen != bucketState.currentFileValidLength) { - throw new RuntimeException("Truncate did not truncate to right length. Should be " + bucketState.currentFileValidLength + " is " + newLen + "."); - } - } - - } else { - LOG.debug("Writing valid-length file for {} to specify valid length {}", partPath, bucketState.currentFileValidLength); - Path validLengthFilePath = getValidLengthPathFor(partPath); - if (!fs.exists(validLengthFilePath)) { - FSDataOutputStream lengthFileOut = fs.create(validLengthFilePath); - lengthFileOut.writeUTF(Long.toString(bucketState.currentFileValidLength)); - lengthFileOut.close(); - } - } - - // invalidate in the state object - bucketState.currentFile = null; - bucketState.currentFileValidLength = -1; - isWriterOpen = false; - } catch (IOException e) { - LOG.error("Error while restoring RollingSink state.", e); - throw new RuntimeException("Error while restoring RollingSink state.", e); - } catch (InvocationTargetException | IllegalAccessException e) { - LOG.error("Could not invoke truncate.", e); - throw new RuntimeException("Could not invoke truncate.", e); - } - } - - // Move files that are confirmed by a checkpoint but did not get moved to final location - // because the checkpoint notification did not happen before a failure - - Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); - LOG.debug("Moving pending files to final location on restore."); - for (Long pastCheckpointId : pastCheckpointIds) { - // All the pending files are buckets that have been completed but are waiting to be renamed - // to their final name - for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) { - Path finalPath = new Path(filename); - Path pendingPath = getPendingPathFor(finalPath); - - try { - if (fs.exists(pendingPath)) { - LOG.debug("(RESTORE) Moving pending file {} to final location after complete checkpoint {}.", pendingPath, pastCheckpointId); - fs.rename(pendingPath, finalPath); - } - } catch (IOException e) { - LOG.error("(RESTORE) Error while renaming pending file {} to final path {}: {}", pendingPath, finalPath, e); - throw new RuntimeException("Error while renaming pending file " + pendingPath+ " to final path " + finalPath, e); - } - } - } - - synchronized (bucketState.pendingFilesPerCheckpoint) { - bucketState.pendingFilesPerCheckpoint.clear(); - } - } - - // -------------------------------------------------------------------------------------------- - // Setters for User configuration values - // -------------------------------------------------------------------------------------------- - - /** - * Sets the maximum bucket size in bytes. - * - * <p> - * When a bucket part file becomes larger than this size a new bucket part file is started and - * the old one is closed. The name of the bucket files depends on the {@link Bucketer}. - * - * @param batchSize The bucket part file size in bytes. - */ - public RollingSink<T> setBatchSize(long batchSize) { - this.batchSize = batchSize; - return this; - } - - /** - * Sets the {@link Bucketer} to use for determining the bucket files to write to. - * - * @param bucketer The bucketer to use. - */ - public RollingSink<T> setBucketer(Bucketer bucketer) { - this.bucketer = bucketer; - return this; - } - - /** - * Sets the {@link Writer} to be used for writing the incoming elements to bucket files. - * - * @param writer The {@code Writer} to use. - */ - public RollingSink<T> setWriter(Writer<T> writer) { - this.writerTemplate = writer; - return this; - } - - /** - * Sets the suffix of in-progress part files. The default is {@code "in-progress"}. - */ - public RollingSink<T> setInProgressSuffix(String inProgressSuffix) { - this.inProgressSuffix = inProgressSuffix; - return this; - } - - /** - * Sets the prefix of in-progress part files. The default is {@code "_"}. - */ - public RollingSink<T> setInProgressPrefix(String inProgressPrefix) { - this.inProgressPrefix = inProgressPrefix; - return this; - } - - /** - * Sets the suffix of pending part files. The default is {@code ".pending"}. - */ - public RollingSink<T> setPendingSuffix(String pendingSuffix) { - this.pendingSuffix = pendingSuffix; - return this; - } - - /** - * Sets the prefix of pending part files. The default is {@code "_"}. - */ - public RollingSink<T> setPendingPrefix(String pendingPrefix) { - this.pendingPrefix = pendingPrefix; - return this; - } - - /** - * Sets the suffix of valid-length files. The default is {@code ".valid-length"}. - */ - public RollingSink<T> setValidLengthSuffix(String validLengthSuffix) { - this.validLengthSuffix = validLengthSuffix; - return this; - } - - /** - * Sets the prefix of valid-length files. The default is {@code "_"}. - */ - public RollingSink<T> setValidLengthPrefix(String validLengthPrefix) { - this.validLengthPrefix = validLengthPrefix; - return this; - } - - /** - * Sets the prefix of part files. The default is {@code "part"}. - */ - public RollingSink<T> setPartPrefix(String partPrefix) { - this.partPrefix = partPrefix; - return this; - } - - /** - * Disable cleanup of leftover in-progress/pending files when the sink is opened. - * - * <p> - * This should only be disabled if using the sink without checkpoints, to not remove - * the files already in the directory. - * - * @deprecated This option is deprecated and remains only for backwards compatibility. - * We do not clean up lingering files anymore. - */ - @Deprecated - public RollingSink<T> disableCleanupOnOpen() { - return this; - } - - /** - * Sets the default timeout for asynchronous operations such as recoverLease and truncate. - * - * @param timeout The timeout, in milliseconds. - */ - public RollingSink<T> setAsyncTimeout(long timeout) { - this.asyncTimeout = timeout; - return this; - } - - // -------------------------------------------------------------------------------------------- - // Internal Classes - // -------------------------------------------------------------------------------------------- - - /** - * This is used for keeping track of the current in-progress files and files that we mark - * for moving from pending to final location after we get a checkpoint-complete notification. - */ - static final class BucketState implements Serializable { - private static final long serialVersionUID = 1L; - - /** - * The file that was in-progress when the last checkpoint occurred. - */ - String currentFile; - - /** - * The valid length of the in-progress file at the time of the last checkpoint. - */ - long currentFileValidLength = -1; - - /** - * Pending files that accumulated since the last checkpoint. - */ - List<String> pendingFiles = new ArrayList<>(); - - /** - * When doing a checkpoint we move the pending files since the last checkpoint to this map - * with the id of the checkpoint. When we get the checkpoint-complete notification we move - * pending files of completed checkpoints to their final location. - */ - final Map<Long, List<String>> pendingFilesPerCheckpoint = new HashMap<>(); - - @Override - public String toString() { - return - "In-progress=" + currentFile + - " validLength=" + currentFileValidLength + - " pendingForNextCheckpoint=" + pendingFiles + - " pendingForPrevCheckpoints=" + pendingFilesPerCheckpoint; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java deleted file mode 100644 index 08c0d0a..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SequenceFileWriter.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.InputTypeConfigurable; -import org.apache.flink.api.java.typeutils.TupleTypeInfoBase; -import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; -import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; - -import java.io.IOException; - -/** - * A {@link Writer} that writes the bucket files as Hadoop {@link SequenceFile SequenceFiles}. - * The input to the {@link BucketingSink} must - * be a {@link org.apache.flink.api.java.tuple.Tuple2} of two Hadopo - * {@link org.apache.hadoop.io.Writable Writables}. - * - * @param <K> The type of the first tuple field. - * @param <V> The type of the second tuple field. - */ -public class SequenceFileWriter<K extends Writable, V extends Writable> extends StreamWriterBase<Tuple2<K, V>> implements InputTypeConfigurable { - private static final long serialVersionUID = 1L; - - private final String compressionCodecName; - - private SequenceFile.CompressionType compressionType; - - private transient SequenceFile.Writer writer; - - private Class<K> keyClass; - - private Class<V> valueClass; - - /** - * Creates a new {@code SequenceFileWriter} that writes sequence files without compression. - */ - public SequenceFileWriter() { - this("None", SequenceFile.CompressionType.NONE); - } - - /** - * Creates a new {@code SequenceFileWriter} that writes sequence with the given - * compression codec and compression type. - * - * @param compressionCodecName Name of a Hadoop Compression Codec. - * @param compressionType The compression type to use. - */ - public SequenceFileWriter(String compressionCodecName, - SequenceFile.CompressionType compressionType) { - this.compressionCodecName = compressionCodecName; - this.compressionType = compressionType; - } - - @Override - public void open(FileSystem fs, Path path) throws IOException { - super.open(fs, path); - if (keyClass == null) { - throw new IllegalStateException("Key Class has not been initialized."); - } - if (valueClass == null) { - throw new IllegalStateException("Value Class has not been initialized."); - } - - CompressionCodec codec = null; - - Configuration conf = HadoopFileSystem.getHadoopConfiguration(); - - if (!compressionCodecName.equals("None")) { - CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); - codec = codecFactory.getCodecByName(compressionCodecName); - if (codec == null) { - throw new RuntimeException("Codec " + compressionCodecName + " not found."); - } - } - - // the non-deprecated constructor syntax is only available in recent hadoop versions... - writer = SequenceFile.createWriter(conf, - getStream(), - keyClass, - valueClass, - compressionType, - codec); - } - - @Override - public void close() throws IOException { - if (writer != null) { - writer.close(); - } - super.close(); - } - - @Override - public void write(Tuple2<K, V> element) throws IOException { - getStream(); // Throws if the stream is not open - writer.append(element.f0, element.f1); - } - - @Override - public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { - if (!type.isTupleType()) { - throw new IllegalArgumentException("Input TypeInformation is not a tuple type."); - } - - TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type; - - if (tupleType.getArity() != 2) { - throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type."); - } - - TypeInformation<K> keyType = tupleType.getTypeAt(0); - TypeInformation<V> valueType = tupleType.getTypeAt(1); - - this.keyClass = keyType.getTypeClass(); - this.valueClass = valueType.getTypeClass(); - } - - @Override - public Writer<Tuple2<K, V>> duplicate() { - SequenceFileWriter<K, V> result = new SequenceFileWriter<>(compressionCodecName, compressionType); - result.keyClass = keyClass; - result.valueClass = valueClass; - return result; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java deleted file mode 100644 index 140246f..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StreamWriterBase.java +++ /dev/null @@ -1,152 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - -import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; - -/** - * Base class for {@link Writer Writers} that write to a {@link FSDataOutputStream}. - */ -public abstract class StreamWriterBase<T> implements Writer<T> { - - private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class); - - /** - * The {@code FSDataOutputStream} for the current part file. - */ - private transient FSDataOutputStream outStream; - - /** - * We use reflection to get the hflush method or use sync as a fallback. - * The idea for this and the code comes from the Flume HDFS Sink. - */ - private transient Method refHflushOrSync; - - /** - * Returns the current output stream, if the stream is open. - */ - protected FSDataOutputStream getStream() { - if (outStream == null) { - throw new IllegalStateException("Output stream has not been opened"); - } - return outStream; - } - - /** - * If hflush is available in this version of HDFS, then this method calls - * hflush, else it calls sync. - * @param os - The stream to flush/sync - * @throws java.io.IOException - * - * <p> - * Note: This code comes from Flume - */ - protected void hflushOrSync(FSDataOutputStream os) throws IOException { - try { - // At this point the refHflushOrSync cannot be null, - // since register method would have thrown if it was. - this.refHflushOrSync.invoke(os); - } catch (InvocationTargetException e) { - String msg = "Error while trying to hflushOrSync!"; - LOG.error(msg + " " + e.getCause()); - Throwable cause = e.getCause(); - if(cause != null && cause instanceof IOException) { - throw (IOException)cause; - } - throw new RuntimeException(msg, e); - } catch (Exception e) { - String msg = "Error while trying to hflushOrSync!"; - LOG.error(msg + " " + e); - throw new RuntimeException(msg, e); - } - } - - /** - * Gets the hflush call using reflection. Fallback to sync if hflush is not available. - * - * <p> - * Note: This code comes from Flume - */ - private Method reflectHflushOrSync(FSDataOutputStream os) { - Method m = null; - if(os != null) { - Class<?> fsDataOutputStreamClass = os.getClass(); - try { - m = fsDataOutputStreamClass.getMethod("hflush"); - } catch (NoSuchMethodException ex) { - LOG.debug("HFlush not found. Will use sync() instead"); - try { - m = fsDataOutputStreamClass.getMethod("sync"); - } catch (Exception ex1) { - String msg = "Neither hflush not sync were found. That seems to be " + - "a problem!"; - LOG.error(msg); - throw new RuntimeException(msg, ex1); - } - } - } - return m; - } - - @Override - public void open(FileSystem fs, Path path) throws IOException { - if (outStream != null) { - throw new IllegalStateException("Writer has already been opened"); - } - outStream = fs.create(path, false); - if (refHflushOrSync == null) { - refHflushOrSync = reflectHflushOrSync(outStream); - } - } - - @Override - public long flush() throws IOException { - if (outStream == null) { - throw new IllegalStateException("Writer is not open"); - } - hflushOrSync(outStream); - return outStream.getPos(); - } - - @Override - public long getPos() throws IOException { - if (outStream == null) { - throw new IllegalStateException("Writer is not open"); - } - return outStream.getPos(); - } - - @Override - public void close() throws IOException { - if (outStream != null) { - flush(); - outStream.close(); - outStream = null; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java deleted file mode 100644 index 6568a86..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/StringWriter.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.charset.IllegalCharsetNameException; -import java.nio.charset.UnsupportedCharsetException; - -/** - * A {@link Writer} that uses {@code toString()} on the input elements and writes them to - * the output bucket file separated by newline. - * - * @param <T> The type of the elements that are being written by the sink. - */ -public class StringWriter<T> extends StreamWriterBase<T> { - private static final long serialVersionUID = 1L; - - private String charsetName; - - private transient Charset charset; - - /** - * Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert - * strings to bytes. - */ - public StringWriter() { - this("UTF-8"); - } - - /** - * Creates a new {@code StringWriter} that uses the given charset to convert - * strings to bytes. - * - * @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)} - */ - public StringWriter(String charsetName) { - this.charsetName = charsetName; - } - - @Override - public void open(FileSystem fs, Path path) throws IOException { - super.open(fs, path); - - try { - this.charset = Charset.forName(charsetName); - } - catch (IllegalCharsetNameException e) { - throw new IOException("The charset " + charsetName + " is not valid.", e); - } - catch (UnsupportedCharsetException e) { - throw new IOException("The charset " + charsetName + " is not supported.", e); - } - } - - @Override - public void write(T element) throws IOException { - FSDataOutputStream outputStream = getStream(); - outputStream.write(element.toString().getBytes(charset)); - outputStream.write('\n'); - } - - @Override - public Writer<T> duplicate() { - return new StringWriter<>(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java deleted file mode 100644 index 41663df..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/SystemClock.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - - -/** - * A {@link Clock} that uses {@code System.currentTimeMillis()} to determine the system time. - */ -public class SystemClock implements Clock { - @Override - public long currentTimeMillis() { - return System.currentTimeMillis(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java deleted file mode 100644 index c3b4cb6..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/Writer.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs; - -import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.io.Serializable; - -/** - * An implementation of {@code Writer} is used in conjunction with a - * {@link BucketingSink} to perform the actual - * writing to the bucket files. - * - * @param <T> The type of the elements that are being written by the sink. - */ -public interface Writer<T> extends Serializable { - - /** - * Initializes the {@code Writer} for a newly opened bucket file. - * Any internal per-bucket initialization should be performed here. - * - * @param fs The {@link org.apache.hadoop.fs.FileSystem} containing the newly opened file. - * @param path The {@link org.apache.hadoop.fs.Path} of the newly opened file. - */ - void open(FileSystem fs, Path path) throws IOException; - - /** - * Flushes out any internally held data, and returns the offset that the file - * must be truncated to at recovery. - */ - long flush() throws IOException; - - /** - * Retrieves the current position, and thus size, of the output file. - */ - long getPos() throws IOException; - - /** - * Closes the {@code Writer}. If the writer is already closed, no action will be - * taken. The call should close all state related to the current output file, - * including the output stream opened in {@code open}. - */ - void close() throws IOException ; - - /** - * Writes one element to the bucket file. - */ - void write(T element)throws IOException; - - /** - * Duplicates the {@code Writer}. This is used to get one {@code Writer} for each - * parallel instance of the sink. - */ - Writer<T> duplicate(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java deleted file mode 100644 index 0bf14b3..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BasePathBucketer.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs.bucketing; - -import org.apache.flink.streaming.connectors.fs.Clock; -import org.apache.hadoop.fs.Path; - -/** - * A {@link Bucketer} that does not perform any - * bucketing of files. All files are written to the base path. - */ -public class BasePathBucketer<T> implements Bucketer<T> { - private static final long serialVersionUID = 1L; - - @Override - public Path getBucketPath(Clock clock, Path basePath, T element) { - return basePath; - } - - @Override - public String toString() { - return "BasePathBucketer"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/de4fe3b7/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java deleted file mode 100644 index 86aa9f3..0000000 --- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/Bucketer.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.fs.bucketing; - -import org.apache.flink.streaming.connectors.fs.Clock; -import org.apache.hadoop.fs.Path; - -import java.io.Serializable; - -/** - * A bucketer is used with a {@link BucketingSink} - * to put emitted elements into rolling files. - * - * <p> - * The {@code BucketingSink} can be writing to many buckets at a time, and it is responsible for managing - * a set of active buckets. Whenever a new element arrives it will ask the {@code Bucketer} for the bucket - * path the element should fall in. The {@code Bucketer} can, for example, determine buckets based on - * system time. - */ -public interface Bucketer<T> extends Serializable { - /** - * Returns the {@link Path} of a bucket file. - * - * @param basePath The base path containing all the buckets. - * @param element The current element being processed. - * - * @return The complete {@code Path} of the bucket which the provided element should fall in. This - * should include the {@code basePath} and also the {@code subtaskIndex} to avoid clashes with - * parallel sinks. - */ - Path getBucketPath(Clock clock, Path basePath, T element); -}
