[
https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16306602#comment-16306602
]
ASF GitHub Bot commented on NIFI-4715:
--------------------------------------
Github user adamlamar commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2361#discussion_r159111452
--- Diff:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
---
@@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
commit(context, session, listCount);
listCount = 0;
} while (bucketLister.isTruncated());
- currentTimestamp = maxTimestamp;
+
+ if (maxTimestamp > currentTimestamp) {
+ currentTimestamp = maxTimestamp;
+ }
final long listMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
getLogger().info("Successfully listed S3 bucket {} in {} millis",
new Object[]{bucket, listMillis});
if (!commit(context, session, listCount)) {
- if (currentTimestamp > 0) {
- persistState(context);
- }
getLogger().debug("No new objects in S3 bucket {} to list.
Yielding.", new Object[]{bucket});
context.yield();
}
+
+ // Persist all state, including any currentKeys
+ persistState(context);
--- End diff --
@ijokarumawak I started writing an example, but then realized you are
correct - there is no need to manually call `persistState` because any addition
to `currentKeys` will also increment `listCount`, and the normal update
mechanism will take over from there. We shouldn't need a `dirtyState` flag.
> ListS3 produces duplicates in frequently updated buckets
> --------------------------------------------------------
>
> Key: NIFI-4715
> URL: https://issues.apache.org/jira/browse/NIFI-4715
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Affects Versions: 1.2.0, 1.3.0, 1.4.0
> Environment: All
> Reporter: Milan Das
> Attachments: List-S3-dup-issue.xml, screenshot-1.png
>
>
> ListS3 state is implemented using HashSet. HashSet is not thread safe. When
> ListS3 operates in multi threaded mode, sometimes it tries to list same
> file from S3 bucket. Seems like HashSet data is getting corrupted.
> currentKeys = new HashSet<>(); // need to be implemented Thread Safe like
> currentKeys = //ConcurrentHashMap.newKeySet();
> *{color:red}+Update+{color}*:
> This is not a HashSet issue:
> Root cause is:
> When the file gets uploaded to S3 simultaneously when List S3 is in progress.
> onTrigger--> maxTimestamp is initiated as 0L.
> This is clearing keys as per the code below
> When lastModifiedTime on S3 object is same as currentTimestamp for the listed
> key it should be skipped. As the key is cleared, it is loading the same file
> again.
> I think fix should be to initiate the maxTimestamp with currentTimestamp not
> 0L.
> {code}
> long maxTimestamp = currentTimestamp;
> {code}
> Following block is clearing keys.
> {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid}
> if (lastModified > maxTimestamp) {
> maxTimestamp = lastModified;
> currentKeys.clear();
> getLogger().debug("clearing keys");
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)