[
https://issues.apache.org/jira/browse/BEAM-1330?focusedWorklogId=713077&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-713077
]
ASF GitHub Bot logged work on BEAM-1330:
----------------------------------------
Author: ASF GitHub Bot
Created on: 21/Jan/22 21:23
Start Date: 21/Jan/22 21:23
Worklog Time Spent: 10m
Work Description: pabloem commented on a change in pull request #16436:
URL: https://github.com/apache/beam/pull/16436#discussion_r790003577
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
##########
@@ -1400,6 +1401,7 @@ public int nextBatchSize(long timeSinceEpochMillis) {
private final V1DatastoreFactory datastoreFactory;
// Current batch of mutations to be written.
private final List<Mutation> mutations = new ArrayList<>();
+ private final HashSet<Mutation> uniqueMutations = new HashSet<>();
Review comment:
seems like we're not using this variable, right:? should we remove it?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
##########
@@ -1409,6 +1411,7 @@ public int nextBatchSize(long timeSinceEpochMillis) {
Metrics.counter(DatastoreWriterFn.class, "datastoreRpcErrors");
private final Counter rpcSuccesses =
Metrics.counter(DatastoreWriterFn.class, "datastoreRpcSuccesses");
+ private final Counter duplicateKeys =
Metrics.counter(DatastoreWriterFn.class, "duplicateKeys");
Review comment:
I worry about what users may think when they see this in their UI. They
may think that we're duplicating data, although that's not exactly what's
happening.
Could we instead call this metric 'earlyFlushes' or something like that?
Another interesting idea would be to have a `Distribution` metric that
aggregates batch sizes instead of the earlyFlushes. This may be interesting for
users debugging their transforms; and I think it should be enough to run the
verifications for the test? WDYT? I think this would be ideal
##########
File path:
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
##########
@@ -94,9 +94,8 @@
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.*;
Review comment:
this file does not seem to have any other changes? Maybe revert changes
to this file?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 713077)
Time Spent: 1h 20m (was: 1h 10m)
> DatastoreIO Writes should flush early when duplicate keys arrive.
> -----------------------------------------------------------------
>
> Key: BEAM-1330
> URL: https://issues.apache.org/jira/browse/BEAM-1330
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Reporter: Vikas Kedigehalli
> Assignee: Fernando Morales
> Priority: P3
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> DatastoreIO writes batches upto 500 entities (rpc limit for Cloud Datastore),
> before flushing them out. The writes are non-transactional and thus do not
> support duplicate keys in the writes. This can be problem, especially when
> using a non global windowing, where multiple windows for the same key end up
> in the same batch, and prevents the writes from succeeding.
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
--
This message was sent by Atlassian Jira
(v8.20.1#820001)