Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2797#discussion_r88032585 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState<T> bucketState) throws Exception { /** * Gets the truncate() call using reflection. - * * <p> - * Note: This code comes from Flume + * <b>NOTE:</b> This code comes from Flume. */ private Method reflectTruncate(FileSystem fs) { - Method m = null; - if(fs != null) { - Class<?> fsClass = fs.getClass(); - try { - m = fsClass.getMethod("truncate", Path.class, long.class); - } catch (NoSuchMethodException ex) { - LOG.debug("Truncate not found. Will write a file with suffix '{}' " + - " and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix); - return null; - } + if (this.refTruncate == null) { + Method m = null; + if (fs != null) { + Class<?> fsClass = fs.getClass(); + try { + m = fsClass.getMethod("truncate", Path.class, long.class); + } catch (NoSuchMethodException ex) { + LOG.debug("Truncate not found. Will write a file with suffix '{}' " + + " and prefix '{}' to specify how many bytes in a bucket are valid.", + validLengthSuffix, validLengthPrefix); + return null; + } + + // verify that truncate actually works + FSDataOutputStream outputStream; + Path testPath = new Path(UUID.randomUUID().toString()); + try { + outputStream = fs.create(testPath); + outputStream.writeUTF("hello"); + outputStream.close(); + } catch (IOException e) { + LOG.error("Could not create file for checking if truncate works.", e); + throw new RuntimeException("Could not create file for checking if truncate works.", e); + } + try { + m.invoke(fs, testPath, 2); + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.debug("Truncate is not supported.", e); + m = null; + } - // verify that truncate actually works - FSDataOutputStream outputStream; - Path testPath = new Path(UUID.randomUUID().toString()); - try { - outputStream = fs.create(testPath); - outputStream.writeUTF("hello"); - outputStream.close(); - } catch (IOException e) { - LOG.error("Could not create file for checking if truncate works.", e); - throw new RuntimeException("Could not create file for checking if truncate works.", e); + try { + fs.delete(testPath, false); + } catch (IOException e) { + LOG.error("Could not delete truncate test file.", e); + throw new RuntimeException("Could not delete truncate test file.", e); + } } + this.refTruncate = m; + } + return this.refTruncate; + } + private Path getPendingPathFor(Path path) { + return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix); + } - try { - m.invoke(fs, testPath, 2); - } catch (IllegalAccessException | InvocationTargetException e) { - LOG.debug("Truncate is not supported.", e); - m = null; - } + private Path getInProgressPathFor(Path path) { + return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix); + } - try { - fs.delete(testPath, false); - } catch (IOException e) { - LOG.error("Could not delete truncate test file.", e); - throw new RuntimeException("Could not delete truncate test file.", e); - } - } - return m; + private Path getValidLengthPathFor(Path path) { + return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix); } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { synchronized (state.bucketStates) { - Iterator<Map.Entry<String, BucketState<T>>> it = state.bucketStates.entrySet().iterator(); - while (it.hasNext()) { - BucketState<T> bucketState = it.next().getValue(); + Iterator<Map.Entry<String, BucketState<T>>> stateIt = state.bucketStates.entrySet().iterator(); + while (stateIt.hasNext()) { + BucketState<T> bucketState = stateIt.next().getValue(); synchronized (bucketState.pendingFilesPerCheckpoint) { - Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); - Set<Long> checkpointsToRemove = new HashSet<>(); - for (Long pastCheckpointId : pastCheckpointIds) { + + Iterator<Map.Entry<Long, List<String>>> pendingIt = + bucketState.pendingFilesPerCheckpoint.entrySet().iterator(); + + while (pendingIt.hasNext()) { + + Map.Entry<Long, List<String>> entry = pendingIt.next(); + Long pastCheckpointId = entry.getKey(); + List<String> pendingPaths = entry.getValue(); + if (pastCheckpointId <= checkpointId) { LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId); - // All the pending files are buckets that have been completed but are waiting to be renamed - // to their final name - for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) { + + for (String filename : pendingPaths) { Path finalPath = new Path(filename); - Path pendingPath = new Path(finalPath.getParent(), - pendingPrefix + finalPath.getName()).suffix(pendingSuffix); + Path pendingPath = getPendingPathFor(finalPath); fs.rename(pendingPath, finalPath); LOG.debug( "Moving pending file {} to final location having completed checkpoint {}.", pendingPath, pastCheckpointId); } - checkpointsToRemove.add(pastCheckpointId); + pendingIt.remove(); } } - if (!bucketState.isWriterOpen && bucketState.pendingFiles.isEmpty()) { + + if (!bucketState.isWriterOpen && + bucketState.pendingFiles.isEmpty()) { + // We've dealt with all the pending files and the writer for this bucket is not currently open. // Therefore this bucket is currently inactive and we can remove it from our state. - it.remove(); - } else { - for (Long toRemove : checkpointsToRemove) { - bucketState.pendingFilesPerCheckpoint.remove(toRemove); - } + stateIt.remove(); } } } } } @Override - public State<T> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { + Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized."); + + restoredBucketStates.clear(); + synchronized (state.bucketStates) { - for (BucketState<T> bucketState : state.bucketStates.values()) { + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + + for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) { --- End diff -- why did you change the loop? you never access the key of the entry.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---