[
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667335#comment-15667335
]
ASF GitHub Bot commented on FLINK-5056:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2797#discussion_r88027157
--- Diff:
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
---
@@ -570,284 +583,278 @@ private void closeCurrentPartFile(BucketState<T>
bucketState) throws Exception {
/**
* Gets the truncate() call using reflection.
- *
* <p>
- * Note: This code comes from Flume
+ * <b>NOTE:</b> This code comes from Flume.
*/
private Method reflectTruncate(FileSystem fs) {
- Method m = null;
- if(fs != null) {
- Class<?> fsClass = fs.getClass();
- try {
- m = fsClass.getMethod("truncate", Path.class,
long.class);
- } catch (NoSuchMethodException ex) {
- LOG.debug("Truncate not found. Will write a
file with suffix '{}' " +
- " and prefix '{}' to specify how many
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
- return null;
- }
+ if (this.refTruncate == null) {
+ Method m = null;
+ if (fs != null) {
+ Class<?> fsClass = fs.getClass();
+ try {
+ m = fsClass.getMethod("truncate",
Path.class, long.class);
+ } catch (NoSuchMethodException ex) {
+ LOG.debug("Truncate not found. Will
write a file with suffix '{}' " +
+ " and prefix '{}' to specify
how many bytes in a bucket are valid.",
+ validLengthSuffix,
validLengthPrefix);
+ return null;
+ }
+ // verify that truncate actually works
+ FSDataOutputStream outputStream;
+ Path testPath = new
Path(UUID.randomUUID().toString());
+ try {
+ outputStream = fs.create(testPath);
+ outputStream.writeUTF("hello");
+ outputStream.close();
+ } catch (IOException e) {
+ LOG.error("Could not create file for
checking if truncate works.", e);
+ throw new RuntimeException("Could not
create file for checking if truncate works.", e);
+ }
- // verify that truncate actually works
- FSDataOutputStream outputStream;
- Path testPath = new Path(UUID.randomUUID().toString());
- try {
- outputStream = fs.create(testPath);
- outputStream.writeUTF("hello");
- outputStream.close();
- } catch (IOException e) {
- LOG.error("Could not create file for checking
if truncate works.", e);
- throw new RuntimeException("Could not create
file for checking if truncate works.", e);
+ try {
+ m.invoke(fs, testPath, 2);
+ } catch (IllegalAccessException |
InvocationTargetException e) {
+ LOG.debug("Truncate is not supported.",
e);
+ m = null;
+ }
+
+ try {
+ fs.delete(testPath, false);
+ } catch (IOException e) {
+ LOG.error("Could not delete truncate
test file.", e);
+ throw new RuntimeException("Could not
delete truncate test file.", e);
+ }
}
+ this.refTruncate = m;
+ }
+ return this.refTruncate;
+ }
+ private Path getPendingPathFor(Path path) {
+ return new Path(path.getParent(), pendingPrefix +
path.getName()).suffix(pendingSuffix);
+ }
- try {
- m.invoke(fs, testPath, 2);
- } catch (IllegalAccessException |
InvocationTargetException e) {
- LOG.debug("Truncate is not supported.", e);
- m = null;
- }
+ private Path getInProgressPathFor(Path path) {
+ return new Path(path.getParent(), inProgressPrefix +
path.getName()).suffix(inProgressSuffix);
+ }
- try {
- fs.delete(testPath, false);
- } catch (IOException e) {
- LOG.error("Could not delete truncate test
file.", e);
- throw new RuntimeException("Could not delete
truncate test file.", e);
- }
- }
- return m;
+ private Path getValidLengthPathFor(Path path) {
+ return new Path(path.getParent(), validLengthPrefix +
path.getName()).suffix(validLengthSuffix);
}
@Override
public void notifyCheckpointComplete(long checkpointId) throws
Exception {
synchronized (state.bucketStates) {
- Iterator<Map.Entry<String, BucketState<T>>> it =
state.bucketStates.entrySet().iterator();
- while (it.hasNext()) {
- BucketState<T> bucketState =
it.next().getValue();
+ Iterator<Map.Entry<String, BucketState<T>>> stateIt =
state.bucketStates.entrySet().iterator();
+ while (stateIt.hasNext()) {
+ BucketState<T> bucketState =
stateIt.next().getValue();
synchronized
(bucketState.pendingFilesPerCheckpoint) {
- Set<Long> pastCheckpointIds =
bucketState.pendingFilesPerCheckpoint.keySet();
- Set<Long> checkpointsToRemove = new
HashSet<>();
- for (Long pastCheckpointId :
pastCheckpointIds) {
+
+ Iterator<Map.Entry<Long, List<String>>>
pendingIt =
+
bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
+
+ while (pendingIt.hasNext()) {
+
+ Map.Entry<Long, List<String>>
entry = pendingIt.next();
+ Long pastCheckpointId =
entry.getKey();
+ List<String> pendingPaths =
entry.getValue();
+
if (pastCheckpointId <=
checkpointId) {
LOG.debug("Moving
pending files to final location for checkpoint {}", pastCheckpointId);
- // All the pending
files are buckets that have been completed but are waiting to be renamed
- // to their final name
- for (String filename :
bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
+
+ for (String filename :
pendingPaths) {
Path finalPath
= new Path(filename);
- Path
pendingPath = new Path(finalPath.getParent(),
-
pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
+ Path
pendingPath = getPendingPathFor(finalPath);
fs.rename(pendingPath, finalPath);
LOG.debug(
"Moving
pending file {} to final location having completed checkpoint {}.",
pendingPath,
pastCheckpointId);
}
-
checkpointsToRemove.add(pastCheckpointId);
+ pendingIt.remove();
}
}
- if (!bucketState.isWriterOpen &&
bucketState.pendingFiles.isEmpty()) {
+
+ if (!bucketState.isWriterOpen &&
+
bucketState.pendingFiles.isEmpty() &&
+
bucketState.pendingFilesPerCheckpoint.isEmpty()) {
+
// We've dealt with all the
pending files and the writer for this bucket is not currently open.
// Therefore this bucket is
currently inactive and we can remove it from our state.
- it.remove();
- } else {
- for (Long toRemove :
checkpointsToRemove) {
-
bucketState.pendingFilesPerCheckpoint.remove(toRemove);
- }
+ stateIt.remove();
}
}
}
}
}
@Override
- public State<T> snapshotState(long checkpointId, long
checkpointTimestamp) throws Exception {
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ Preconditions.checkNotNull(restoredBucketStates, "The operator
has not been properly initialized.");
+
+ restoredBucketStates.clear();
+
synchronized (state.bucketStates) {
- for (BucketState<T> bucketState :
state.bucketStates.values()) {
+ int subtaskIdx =
getRuntimeContext().getIndexOfThisSubtask();
+
+ for (Map.Entry<String, BucketState<T>> bucketStateEntry
: state.bucketStates.entrySet()) {
+ BucketState<T> bucketState =
bucketStateEntry.getValue();
+
if (bucketState.isWriterOpen) {
- long pos = bucketState.writer.flush();
- bucketState.currentFileValidLength =
pos;
+ bucketState.currentFileValidLength =
bucketState.writer.flush();
}
+
synchronized
(bucketState.pendingFilesPerCheckpoint) {
-
bucketState.pendingFilesPerCheckpoint.put(checkpointId,
bucketState.pendingFiles);
+
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(),
bucketState.pendingFiles);
}
bucketState.pendingFiles = new ArrayList<>();
}
+ restoredBucketStates.add(state);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("BucketingSink idx {} checkpointed
{}.", subtaskIdx, state);
+ }
}
- return state;
}
- @Override
- public void restoreState(State<T> state) {
- this.state = state;
-
- try {
- initFileSystem();
- } catch (IOException e) {
- LOG.error("Error while creating FileSystem in
checkpoint restore.", e);
- throw new RuntimeException("Error while creating
FileSystem in checkpoint restore.", e);
- }
+ private void handleRestoredBucketState(BucketState<T> bucketState) {
+ // we can clean all the pending files since they were renamed to
--- End diff --
according to the PR description we no longer delete files at this point
since multiple tasks may use the same file. Under which circumstances can this
occur?
> BucketingSink deletes valid data when checkpoint notification is slow.
> ----------------------------------------------------------------------
>
> Key: FLINK-5056
> URL: https://issues.apache.org/jira/browse/FLINK-5056
> Project: Flink
> Issue Type: Bug
> Components: filesystem-connector
> Affects Versions: 1.1.3
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a
> notification about a previous checkpoint arrives, it clears its state. This
> can
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the
> problem:
> -> input data
> -> snapshot(0)
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the
> data
> that arrived for checkpoint 1.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)