This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bd76cc130f Deleted initialNumReaders paramter. (#24355)
3bd76cc130f is described below

commit 3bd76cc130ffee98e6f33023936b703e0d4e1c7b
Author: AdalbertMemSQL <[email protected]>
AuthorDate: Wed Nov 30 06:53:19 2022 +0200

    Deleted initialNumReaders paramter. (#24355)
    
    * Deleted initialNumReaders paramter.
    Pre-split restrictions to the maximum number of readers.
    See the discussion in the design doc for more details: 
https://docs.google.com/document/d/1WU-hkoZ93SaGXyOz_UtX0jXzIRl194hCId_IdmEV9jw/edit?disco=AAAAjCSqPvs
    
    * Nit reformatting
    
    * Fixed bug in splitRange function
---
 .../beam/sdk/io/singlestore/SingleStoreIO.java     | 41 ++----------
 .../sdk/io/singlestore/ReadWithPartitionsTest.java | 76 ++++------------------
 2 files changed, 16 insertions(+), 101 deletions(-)

diff --git 
a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
 
b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
index a698d713e1c..6873ae6b8b3 100644
--- 
a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
+++ 
b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
@@ -545,8 +545,6 @@ public class SingleStoreIO {
 
     abstract @Nullable RowMapper<T> getRowMapper();
 
-    abstract @Nullable Integer getInitialNumReaders();
-
     abstract Builder<T> toBuilder();
 
     @AutoValue.Builder
@@ -560,8 +558,6 @@ public class SingleStoreIO {
 
       abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
 
-      abstract Builder<T> setInitialNumReaders(Integer initialNumReaders);
-
       abstract ReadWithPartitions<T> build();
     }
 
@@ -585,12 +581,6 @@ public class SingleStoreIO {
       return toBuilder().setRowMapper(rowMapper).build();
     }
 
-    /** Pre-split initial restriction and start initialNumReaders reading at 
the very beginning. */
-    public ReadWithPartitions<T> withInitialNumReaders(Integer 
initialNumReaders) {
-      checkNotNull(initialNumReaders, "initialNumReaders can not be null");
-      return toBuilder().setInitialNumReaders(initialNumReaders).build();
-    }
-
     @Override
     public PCollection<T> expand(PBegin input) {
       DataSourceConfiguration dataSourceConfiguration = 
getDataSourceConfiguration();
@@ -603,10 +593,6 @@ public class SingleStoreIO {
       RowMapper<T> rowMapper = getRowMapper();
       Preconditions.checkArgumentNotNull(rowMapper, "withRowMapper() is 
required");
 
-      int initialNumReaders = 
SingleStoreUtil.getArgumentWithDefault(getInitialNumReaders(), 1);
-      checkArgument(
-          initialNumReaders >= 1, "withInitialNumReaders() should be greater 
or equal to 1");
-
       String actualQuery = SingleStoreUtil.getSelectQuery(getTable(), 
getQuery());
 
       Coder<T> coder =
@@ -621,11 +607,7 @@ public class SingleStoreIO {
           .apply(
               ParDo.of(
                   new ReadWithPartitions.ReadWithPartitionsFn<>(
-                      dataSourceConfiguration,
-                      actualQuery,
-                      database,
-                      rowMapper,
-                      initialNumReaders)))
+                      dataSourceConfiguration, actualQuery, database, 
rowMapper)))
           .setCoder(coder);
     }
 
@@ -635,19 +617,16 @@ public class SingleStoreIO {
       String query;
       String database;
       RowMapper<OutputT> rowMapper;
-      int initialNumReaders;
 
       ReadWithPartitionsFn(
           DataSourceConfiguration dataSourceConfiguration,
           String query,
           String database,
-          RowMapper<OutputT> rowMapper,
-          int initialNumReaders) {
+          RowMapper<OutputT> rowMapper) {
         this.dataSourceConfiguration = dataSourceConfiguration;
         this.query = query;
         this.database = database;
         this.rowMapper = rowMapper;
-        this.initialNumReaders = initialNumReaders;
       }
 
       @ProcessElement
@@ -690,19 +669,8 @@ public class SingleStoreIO {
           @Element ParameterT element,
           @Restriction OffsetRange range,
           OutputReceiver<OffsetRange> receiver) {
-        long numPartitions = range.getTo() - range.getFrom();
-        checkArgument(
-            initialNumReaders <= numPartitions,
-            "withInitialNumReaders() should not be greater then number of 
partitions in the database.\n"
-                + String.format(
-                    "InitialNumReaders is %d, number of partitions in the 
database is %d",
-                    initialNumReaders, range.getTo()));
-
-        for (int i = 0; i < initialNumReaders; i++) {
-          receiver.output(
-              new OffsetRange(
-                  range.getFrom() + numPartitions * i / initialNumReaders,
-                  range.getFrom() + numPartitions * (i + 1) / 
initialNumReaders));
+        for (long i = range.getFrom(); i < range.getTo(); i++) {
+          receiver.output(new OffsetRange(i, i + 1));
         }
       }
 
@@ -744,7 +712,6 @@ public class SingleStoreIO {
       builder.addIfNotNull(DisplayData.item("table", getTable()));
       builder.addIfNotNull(
           DisplayData.item("rowMapper", 
SingleStoreUtil.getClassNameOrNull(getRowMapper())));
-      builder.addIfNotNull(DisplayData.item("initialNumReaders", 
getInitialNumReaders()));
     }
   }
 
diff --git 
a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/ReadWithPartitionsTest.java
 
b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/ReadWithPartitionsTest.java
index 8b96e6f9909..32ca3568026 100644
--- 
a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/ReadWithPartitionsTest.java
+++ 
b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/ReadWithPartitionsTest.java
@@ -148,67 +148,16 @@ public class ReadWithPartitionsTest {
     pipeline.run();
   }
 
-  @Test
-  public void testReadWithPartitionsWithInitialNumReaders() {
-    PCollection<TestRow> rows =
-        pipeline.apply(
-            SingleStoreIO.<TestRow>readWithPartitions()
-                .withDataSourceConfiguration(dataSourceConfiguration)
-                .withQuery("SELECT * FROM `t`")
-                .withRowMapper(new TestHelper.TestRowMapper())
-                .withInitialNumReaders(2));
-
-    PAssert.thatSingleton(rows.apply("Count All", Count.globally()))
-        .isEqualTo((long) EXPECTED_ROW_COUNT);
-
-    Iterable<TestRow> expectedValues = TestRow.getExpectedValues(0, 
EXPECTED_ROW_COUNT);
-    PAssert.that(rows).containsInAnyOrder(expectedValues);
-
-    pipeline.run();
-  }
-
-  @Test
-  public void testReadWithPartitionsZeroInitialNumReaders() {
-    assertThrows(
-        "withInitialNumReaders() should be greater or equal to 1",
-        IllegalArgumentException.class,
-        () -> {
-          pipelineForErrorChecks.apply(
-              SingleStoreIO.<TestRow>readWithPartitions()
-                  .withDataSourceConfiguration(dataSourceConfiguration)
-                  .withTable("t")
-                  .withInitialNumReaders(0)
-                  .withRowMapper(new TestHelper.TestRowMapper()));
-        });
-  }
-
-  @Test
-  public void testReadWithPartitionsTooBigInitialNumReaders() {
-    pipelineForErrorChecks.apply(
-        SingleStoreIO.<TestRow>readWithPartitions()
-            .withDataSourceConfiguration(dataSourceConfiguration)
-            .withTable("t")
-            .withInitialNumReaders(100)
-            .withRowMapper(new TestHelper.TestRowMapper()));
-
-    assertThrows(
-        "withInitialNumReaders() should not be greater then number of 
partitions in the database.\n"
-            + "InitialNumReaders is 100, number of partitions in the database 
is 2",
-        Pipeline.PipelineExecutionException.class,
-        () -> pipelineForErrorChecks.run().waitUntilFinish());
-  }
-
   @Test
   public void testReadWithPartitionsNoTableAndQuery() {
     assertThrows(
         "One of withTable() or withQuery() is required",
         IllegalArgumentException.class,
-        () -> {
-          pipelineForErrorChecks.apply(
-              SingleStoreIO.<TestRow>readWithPartitions()
-                  .withDataSourceConfiguration(dataSourceConfiguration)
-                  .withRowMapper(new TestHelper.TestRowMapper()));
-        });
+        () ->
+            pipelineForErrorChecks.apply(
+                SingleStoreIO.<TestRow>readWithPartitions()
+                    .withDataSourceConfiguration(dataSourceConfiguration)
+                    .withRowMapper(new TestHelper.TestRowMapper())));
   }
 
   @Test
@@ -216,13 +165,12 @@ public class ReadWithPartitionsTest {
     assertThrows(
         "withTable() can not be used together with withQuery()",
         IllegalArgumentException.class,
-        () -> {
-          pipelineForErrorChecks.apply(
-              SingleStoreIO.<TestRow>readWithPartitions()
-                  .withDataSourceConfiguration(dataSourceConfiguration)
-                  .withTable("t")
-                  .withQuery("SELECT * FROM `t`")
-                  .withRowMapper(new TestHelper.TestRowMapper()));
-        });
+        () ->
+            pipelineForErrorChecks.apply(
+                SingleStoreIO.<TestRow>readWithPartitions()
+                    .withDataSourceConfiguration(dataSourceConfiguration)
+                    .withTable("t")
+                    .withQuery("SELECT * FROM `t`")
+                    .withRowMapper(new TestHelper.TestRowMapper())));
   }
 }

Reply via email to