[ 
https://issues.apache.org/jira/browse/BEAM-5445?focusedWorklogId=149223&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-149223
 ]

ASF GitHub Bot logged work on BEAM-5445:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Sep/18 17:16
            Start Date: 28/Sep/18 17:16
    Worklog Time Spent: 10m 
      Work Description: nithinsujir commented on a change in pull request 
#6478: [BEAM-5445] [BEAM-4796] SpannerIO: Only batch on the current bundle. 
Adds streaming support
URL: https://github.com/apache/beam/pull/6478#discussion_r221315969
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java
 ##########
 @@ -371,97 +276,46 @@ public void noBatching() throws Exception {
 
   @Test
   @Category(NeedsRunner.class)
-  public void batchingPlusSampling() throws Exception {
-    PCollection<MutationGroup> mutations =
-        pipeline.apply(
-            Create.of(
-                g(m(1L)), g(m(2L)), g(m(3L)), g(m(4L)), g(m(5L)), g(m(6L)), 
g(m(7L)), g(m(8L)),
-                g(m(9L)), g(m(10L))));
-
-    mutations.apply(
-        SpannerIO.write()
-            .withProjectId("test-project")
-            .withInstanceId("test-instance")
-            .withDatabaseId("test-database")
-            .withServiceFactory(serviceFactory)
-            .withBatchSizeBytes(1000000000)
-            .withSampler(fakeSampler(m(2L), m(5L), m(10L)))
-            .grouped());
-    pipeline.run();
+  public void reportFailures() throws Exception {
 
-    verifyBatches(
-        batch(m(1L), m(2L)), batch(m(3L), m(4L), m(5L)), batch(m(6L), m(7L), 
m(8L), m(9L), m(10L)));
-  }
+    MutationGroup[] mutationGroups = new MutationGroup[10];
+    for (int i = 0; i < mutationGroups.length; i++) {
+      mutationGroups[i] = g(m((long) i));
+    }
 
-  @Test
-  @Category(NeedsRunner.class)
-  public void reportFailures() throws Exception {
-    PCollection<MutationGroup> mutations =
-        pipeline.apply(
-            Create.of(
-                g(m(1L)), g(m(2L)), g(m(3L)), g(m(4L)), g(m(5L)), g(m(6L)), 
g(m(7L)), g(m(8L)),
-                g(m(9L)), g(m(10L))));
+    List<MutationGroup> mutationGroupList = Arrays.asList(mutationGroups);
 
     when(serviceFactory.mockDatabaseClient().writeAtLeastOnce(any()))
         .thenAnswer(
             invocationOnMock -> {
+              Preconditions.checkNotNull(invocationOnMock.getArguments()[0]);
               throw 
SpannerExceptionFactory.newSpannerException(ErrorCode.ALREADY_EXISTS, "oops");
             });
 
     SpannerWriteResult result =
-        mutations.apply(
-            SpannerIO.write()
-                .withProjectId("test-project")
-                .withInstanceId("test-instance")
-                .withDatabaseId("test-database")
-                .withServiceFactory(serviceFactory)
-                .withBatchSizeBytes(1000000000)
-                .withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES)
-                .withSampler(fakeSampler(m(2L), m(5L), m(10L)))
-                .grouped());
+        pipeline
+            .apply(Create.of(mutationGroupList))
+            .apply(
+                SpannerIO.write()
+                    .withProjectId("test-project")
+                    .withInstanceId("test-instance")
+                    .withDatabaseId("test-database")
+                    .withServiceFactory(serviceFactory)
+                    .withBatchSizeBytes(0)
 
 Review comment:
   BatchSizeBytes went to 0. Is that intentional?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 149223)
    Time Spent: 1.5h  (was: 1h 20m)

> Update SpannerIO to support unbounded writes
> --------------------------------------------
>
>                 Key: BEAM-5445
>                 URL: https://issues.apache.org/jira/browse/BEAM-5445
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Chamikara Jayalath
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Currently, due to a known issue, streaming pipelines that use SpannerIO.Write 
> do not actually write to Spanner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to