[ 
https://issues.apache.org/jira/browse/BEAM-4061?focusedWorklogId=97210&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-97210
 ]

ASF GitHub Bot logged work on BEAM-4061:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/May/18 19:54
            Start Date: 01/May/18 19:54
    Worklog Time Spent: 10m 
      Work Description: jkff commented on a change in pull request #4264: 
[BEAM-4061] Introduced SpannerWriteResult
URL: https://github.com/apache/beam/pull/4264#discussion_r185315783
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 ##########
 @@ -777,21 +797,23 @@ public PDone expand(PCollection<MutationGroup> input) {
               .apply("Sample keys", sampler)
               .apply("Keys sample as view", View.asMap());
 
+      TupleTag<Void> mainTag = new TupleTag<>("mainOut");
+      TupleTag<MutationGroup> failedTag = new TupleTag<>("failedMutations");
       // Assign partition based on the closest element in the sample and group 
mutations.
       AssignPartitionFn assignPartitionFn = new AssignPartitionFn(keySample);
-      serialized
+      PCollectionTuple result = serialized
           .apply("Partition input", 
ParDo.of(assignPartitionFn).withSideInputs(keySample))
           .setCoder(KvCoder.of(StringUtf8Coder.of(), 
SerializedMutationCoder.of()))
-          .apply("Group by partition", GroupByKey.create())
-          .apply(
-              "Batch mutations together",
+          .apply("Group by partition", GroupByKey.create()).apply("Batch 
mutations together",
               ParDo.of(new BatchFn(spec.getBatchSizeBytes(), 
spec.getSpannerConfig(), schemaView))
-                  .withSideInputs(schemaView))
-          .apply(
-              "Write mutations to Spanner",
-              ParDo.of(new WriteToSpannerFn(spec.getSpannerConfig())));
-      return PDone.in(input.getPipeline());
-
+                  .withSideInputs(schemaView)).apply("Write mutations to 
Spanner",
+              ParDo.of(new WriteToSpannerFn(spec.getSpannerConfig(), 
spec.getFailureMode(),
+                  failedTag))
+                  .withOutputTags(mainTag, TupleTagList.of(failedTag)));
+      PCollection<MutationGroup> failedMutations = result.get(failedTag);
+      failedMutations.setCoder(SerializableCoder.of(MutationGroup.class));
 
 Review comment:
   It's unfortunate that we can't directly use MutationGroupEncoder as a Coder, 
since it requires a Schema known at construction time...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 97210)
    Time Spent: 1h 50m  (was: 1h 40m)

> Chaining SpannerIO#write() transforms
> -------------------------------------
>
>                 Key: BEAM-4061
>                 URL: https://issues.apache.org/jira/browse/BEAM-4061
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>            Reporter: Mairbek Khadikov
>            Assignee: Mairbek Khadikov
>            Priority: Major
>             Fix For: 2.5.0
>
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> It should be possible to chain several Cloud Spanner writes. In practice, we 
> can leverage Wait.on transform by returning a result object from 
> SpannerIO#write.
> One particular example, when this feature is useful is full database import. 
> When data in parent tables should be injected before interleaved tables. See 
> more about table hierarchies in Spanner here 
> https://cloud.google.com/spanner/docs/schema-and-data-model#creating_a_hierarchy_of_interleaved_tables



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to