[
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=150240&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-150240
]
ASF GitHub Bot logged work on BEAM-5445:
----------------------------------------
Author: ASF GitHub Bot
Created on: 01/Oct/18 23:01
Start Date: 01/Oct/18 23:01
Worklog Time Spent: 10m
Work Description: nithinsujir commented on a change in pull request
#6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle.
Adds streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221782468
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
##########
@@ -1074,59 +1061,61 @@ public void setup() {
batch = ImmutableList.builder();
batchSizeBytes = 0;
batchCells = 0;
- spannerAccessor = spannerConfig.connectToSpanner();
- }
-
- @Teardown
- public void teardown() {
- spannerAccessor.close();
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
SpannerSchema spannerSchema = c.sideInput(schemaView);
- MutationGroupEncoder mutationGroupEncoder = new
MutationGroupEncoder(spannerSchema);
- KV<String, Iterable<SerializedMutation>> element = c.element();
- for (SerializedMutation kv : element.getValue()) {
- byte[] value = kv.getMutationGroupBytes();
- MutationGroup mg = mutationGroupEncoder.decode(value);
+ // Iterate through list, outputting whenever a batch is complete.
+ for (KV<byte[], byte[]> kv : c.element()) {
+ MutationGroup mg = decode(kv.getValue());
+
long groupSize = MutationSizeEstimator.sizeOf(mg);
long groupCells = MutationCellCounter.countOf(spannerSchema, mg);
- if (batchCells + groupCells > maxNumMutations
- || batchSizeBytes + groupSize > maxBatchSizeBytes) {
- ImmutableList<MutationGroup> mutations = batch.build();
- c.output(mutations);
- batch = ImmutableList.builder();
- batchSizeBytes = 0;
- batchCells = 0;
+ if (((batchCells + groupCells) > maxNumMutations)
+ || ((batchSizeBytes + groupSize) > maxBatchSizeBytes)) {
+ outputBatch(c);
Review comment:
Any reason why "c" needs to be passed around when everything else is coming
via a member variable?
And am I getting it right that this is not thread safe and I assume that's
well understood and accepted?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 150240)
Time Spent: 2.5h (was: 2h 20m)
> Update SpannerIO to support unbounded writes
> --------------------------------------------
>
> Key: BEAM-5445
> URL: https://issues.apache.org/jira/browse/BEAM-5445
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Reporter: Chamikara Jayalath
> Priority: Major
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write
> do not actually write to Spanner.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)