This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3bd76cc130f Deleted initialNumReaders paramter. (#24355)
3bd76cc130f is described below
commit 3bd76cc130ffee98e6f33023936b703e0d4e1c7b
Author: AdalbertMemSQL <[email protected]>
AuthorDate: Wed Nov 30 06:53:19 2022 +0200
Deleted initialNumReaders paramter. (#24355)
* Deleted initialNumReaders paramter.
Pre-split restrictions to the maximum number of readers.
See the discussion in the design doc for more details:
https://docs.google.com/document/d/1WU-hkoZ93SaGXyOz_UtX0jXzIRl194hCId_IdmEV9jw/edit?disco=AAAAjCSqPvs
* Nit reformatting
* Fixed bug in splitRange function
---
.../beam/sdk/io/singlestore/SingleStoreIO.java | 41 ++----------
.../sdk/io/singlestore/ReadWithPartitionsTest.java | 76 ++++------------------
2 files changed, 16 insertions(+), 101 deletions(-)
diff --git
a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
index a698d713e1c..6873ae6b8b3 100644
---
a/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
+++
b/sdks/java/io/singlestore/src/main/java/org/apache/beam/sdk/io/singlestore/SingleStoreIO.java
@@ -545,8 +545,6 @@ public class SingleStoreIO {
abstract @Nullable RowMapper<T> getRowMapper();
- abstract @Nullable Integer getInitialNumReaders();
-
abstract Builder<T> toBuilder();
@AutoValue.Builder
@@ -560,8 +558,6 @@ public class SingleStoreIO {
abstract Builder<T> setRowMapper(RowMapper<T> rowMapper);
- abstract Builder<T> setInitialNumReaders(Integer initialNumReaders);
-
abstract ReadWithPartitions<T> build();
}
@@ -585,12 +581,6 @@ public class SingleStoreIO {
return toBuilder().setRowMapper(rowMapper).build();
}
- /** Pre-split initial restriction and start initialNumReaders reading at
the very beginning. */
- public ReadWithPartitions<T> withInitialNumReaders(Integer
initialNumReaders) {
- checkNotNull(initialNumReaders, "initialNumReaders can not be null");
- return toBuilder().setInitialNumReaders(initialNumReaders).build();
- }
-
@Override
public PCollection<T> expand(PBegin input) {
DataSourceConfiguration dataSourceConfiguration =
getDataSourceConfiguration();
@@ -603,10 +593,6 @@ public class SingleStoreIO {
RowMapper<T> rowMapper = getRowMapper();
Preconditions.checkArgumentNotNull(rowMapper, "withRowMapper() is
required");
- int initialNumReaders =
SingleStoreUtil.getArgumentWithDefault(getInitialNumReaders(), 1);
- checkArgument(
- initialNumReaders >= 1, "withInitialNumReaders() should be greater
or equal to 1");
-
String actualQuery = SingleStoreUtil.getSelectQuery(getTable(),
getQuery());
Coder<T> coder =
@@ -621,11 +607,7 @@ public class SingleStoreIO {
.apply(
ParDo.of(
new ReadWithPartitions.ReadWithPartitionsFn<>(
- dataSourceConfiguration,
- actualQuery,
- database,
- rowMapper,
- initialNumReaders)))
+ dataSourceConfiguration, actualQuery, database,
rowMapper)))
.setCoder(coder);
}
@@ -635,19 +617,16 @@ public class SingleStoreIO {
String query;
String database;
RowMapper<OutputT> rowMapper;
- int initialNumReaders;
ReadWithPartitionsFn(
DataSourceConfiguration dataSourceConfiguration,
String query,
String database,
- RowMapper<OutputT> rowMapper,
- int initialNumReaders) {
+ RowMapper<OutputT> rowMapper) {
this.dataSourceConfiguration = dataSourceConfiguration;
this.query = query;
this.database = database;
this.rowMapper = rowMapper;
- this.initialNumReaders = initialNumReaders;
}
@ProcessElement
@@ -690,19 +669,8 @@ public class SingleStoreIO {
@Element ParameterT element,
@Restriction OffsetRange range,
OutputReceiver<OffsetRange> receiver) {
- long numPartitions = range.getTo() - range.getFrom();
- checkArgument(
- initialNumReaders <= numPartitions,
- "withInitialNumReaders() should not be greater then number of
partitions in the database.\n"
- + String.format(
- "InitialNumReaders is %d, number of partitions in the
database is %d",
- initialNumReaders, range.getTo()));
-
- for (int i = 0; i < initialNumReaders; i++) {
- receiver.output(
- new OffsetRange(
- range.getFrom() + numPartitions * i / initialNumReaders,
- range.getFrom() + numPartitions * (i + 1) /
initialNumReaders));
+ for (long i = range.getFrom(); i < range.getTo(); i++) {
+ receiver.output(new OffsetRange(i, i + 1));
}
}
@@ -744,7 +712,6 @@ public class SingleStoreIO {
builder.addIfNotNull(DisplayData.item("table", getTable()));
builder.addIfNotNull(
DisplayData.item("rowMapper",
SingleStoreUtil.getClassNameOrNull(getRowMapper())));
- builder.addIfNotNull(DisplayData.item("initialNumReaders",
getInitialNumReaders()));
}
}
diff --git
a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/ReadWithPartitionsTest.java
b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/ReadWithPartitionsTest.java
index 8b96e6f9909..32ca3568026 100644
---
a/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/ReadWithPartitionsTest.java
+++
b/sdks/java/io/singlestore/src/test/java/org/apache/beam/sdk/io/singlestore/ReadWithPartitionsTest.java
@@ -148,67 +148,16 @@ public class ReadWithPartitionsTest {
pipeline.run();
}
- @Test
- public void testReadWithPartitionsWithInitialNumReaders() {
- PCollection<TestRow> rows =
- pipeline.apply(
- SingleStoreIO.<TestRow>readWithPartitions()
- .withDataSourceConfiguration(dataSourceConfiguration)
- .withQuery("SELECT * FROM `t`")
- .withRowMapper(new TestHelper.TestRowMapper())
- .withInitialNumReaders(2));
-
- PAssert.thatSingleton(rows.apply("Count All", Count.globally()))
- .isEqualTo((long) EXPECTED_ROW_COUNT);
-
- Iterable<TestRow> expectedValues = TestRow.getExpectedValues(0,
EXPECTED_ROW_COUNT);
- PAssert.that(rows).containsInAnyOrder(expectedValues);
-
- pipeline.run();
- }
-
- @Test
- public void testReadWithPartitionsZeroInitialNumReaders() {
- assertThrows(
- "withInitialNumReaders() should be greater or equal to 1",
- IllegalArgumentException.class,
- () -> {
- pipelineForErrorChecks.apply(
- SingleStoreIO.<TestRow>readWithPartitions()
- .withDataSourceConfiguration(dataSourceConfiguration)
- .withTable("t")
- .withInitialNumReaders(0)
- .withRowMapper(new TestHelper.TestRowMapper()));
- });
- }
-
- @Test
- public void testReadWithPartitionsTooBigInitialNumReaders() {
- pipelineForErrorChecks.apply(
- SingleStoreIO.<TestRow>readWithPartitions()
- .withDataSourceConfiguration(dataSourceConfiguration)
- .withTable("t")
- .withInitialNumReaders(100)
- .withRowMapper(new TestHelper.TestRowMapper()));
-
- assertThrows(
- "withInitialNumReaders() should not be greater then number of
partitions in the database.\n"
- + "InitialNumReaders is 100, number of partitions in the database
is 2",
- Pipeline.PipelineExecutionException.class,
- () -> pipelineForErrorChecks.run().waitUntilFinish());
- }
-
@Test
public void testReadWithPartitionsNoTableAndQuery() {
assertThrows(
"One of withTable() or withQuery() is required",
IllegalArgumentException.class,
- () -> {
- pipelineForErrorChecks.apply(
- SingleStoreIO.<TestRow>readWithPartitions()
- .withDataSourceConfiguration(dataSourceConfiguration)
- .withRowMapper(new TestHelper.TestRowMapper()));
- });
+ () ->
+ pipelineForErrorChecks.apply(
+ SingleStoreIO.<TestRow>readWithPartitions()
+ .withDataSourceConfiguration(dataSourceConfiguration)
+ .withRowMapper(new TestHelper.TestRowMapper())));
}
@Test
@@ -216,13 +165,12 @@ public class ReadWithPartitionsTest {
assertThrows(
"withTable() can not be used together with withQuery()",
IllegalArgumentException.class,
- () -> {
- pipelineForErrorChecks.apply(
- SingleStoreIO.<TestRow>readWithPartitions()
- .withDataSourceConfiguration(dataSourceConfiguration)
- .withTable("t")
- .withQuery("SELECT * FROM `t`")
- .withRowMapper(new TestHelper.TestRowMapper()));
- });
+ () ->
+ pipelineForErrorChecks.apply(
+ SingleStoreIO.<TestRow>readWithPartitions()
+ .withDataSourceConfiguration(dataSourceConfiguration)
+ .withTable("t")
+ .withQuery("SELECT * FROM `t`")
+ .withRowMapper(new TestHelper.TestRowMapper())));
}
}