This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fd98cf9 Add option to disable reshuffling of JdbcIO
new 4815d0d Merge pull request #7956: [BEAM-6670] Add
`withOutputParallelization` option to disable reparallelization of JdbcIO.Read
fd98cf9 is described below
commit fd98cf91f16c9bc7fe6e77c2fa4866bf8cc3e056
Author: Mike Pedersen <[email protected]>
AuthorDate: Wed Feb 27 14:08:11 2019 +0100
Add option to disable reshuffling of JdbcIO
---
.../java/org/apache/beam/sdk/io/jdbc/JdbcIO.java | 59 +++++++++++++++++-----
1 file changed, 47 insertions(+), 12 deletions(-)
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 354b26c..e6f2699 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -160,7 +160,10 @@ public class JdbcIO {
* @param <T> Type of the data to be read.
*/
public static <T> Read<T> read() {
- return new
AutoValue_JdbcIO_Read.Builder<T>().setFetchSize(DEFAULT_FETCH_SIZE).build();
+ return new AutoValue_JdbcIO_Read.Builder<T>()
+ .setFetchSize(DEFAULT_FETCH_SIZE)
+ .setOutputParallelization(true)
+ .build();
}
/**
@@ -173,6 +176,7 @@ public class JdbcIO {
public static <ParameterT, OutputT> ReadAll<ParameterT, OutputT> readAll() {
return new AutoValue_JdbcIO_ReadAll.Builder<ParameterT, OutputT>()
.setFetchSize(DEFAULT_FETCH_SIZE)
+ .setOutputParallelization(true)
.build();
}
@@ -399,6 +403,8 @@ public class JdbcIO {
abstract int getFetchSize();
+ abstract boolean getOutputParallelization();
+
abstract Builder<T> toBuilder();
@AutoValue.Builder
@@ -415,6 +421,8 @@ public class JdbcIO {
abstract Builder<T> setFetchSize(int fetchSize);
+ abstract Builder<T> setOutputParallelization(boolean
outputParallelization);
+
abstract Read<T> build();
}
@@ -457,6 +465,14 @@ public class JdbcIO {
return toBuilder().setFetchSize(fetchSize).build();
}
+ /**
+ * Whether to reshuffle the resulting PCollection so results are
distributed to all workers. The
+ * default is to parallelize and should only be changed if this is known
to be unnecessary.
+ */
+ public Read<T> withOutputParallelization(boolean outputParallelization) {
+ return
toBuilder().setOutputParallelization(outputParallelization).build();
+ }
+
@Override
public PCollection<T> expand(PBegin input) {
checkArgument(getQuery() != null, "withQuery() is required");
@@ -474,6 +490,7 @@ public class JdbcIO {
.withCoder(getCoder())
.withRowMapper(getRowMapper())
.withFetchSize(getFetchSize())
+ .withOutputParallelization(getOutputParallelization())
.withParameterSetter(
(element, preparedStatement) -> {
if (getStatementPreparator() != null) {
@@ -515,6 +532,8 @@ public class JdbcIO {
abstract int getFetchSize();
+ abstract boolean getOutputParallelization();
+
abstract Builder<ParameterT, OutputT> toBuilder();
@AutoValue.Builder
@@ -533,6 +552,8 @@ public class JdbcIO {
abstract Builder<ParameterT, OutputT> setFetchSize(int fetchSize);
+ abstract Builder<ParameterT, OutputT> setOutputParallelization(boolean
outputParallelization);
+
abstract ReadAll<ParameterT, OutputT> build();
}
@@ -582,19 +603,33 @@ public class JdbcIO {
return toBuilder().setFetchSize(fetchSize).build();
}
+ /**
+ * Whether to reshuffle the resulting PCollection so results are
distributed to all workers. The
+ * default is to parallelize and should only be changed if this is known
to be unnecessary.
+ */
+ public ReadAll<ParameterT, OutputT> withOutputParallelization(boolean
outputParallelization) {
+ return
toBuilder().setOutputParallelization(outputParallelization).build();
+ }
+
@Override
public PCollection<OutputT> expand(PCollection<ParameterT> input) {
- return input
- .apply(
- ParDo.of(
- new ReadFn<>(
- getDataSourceConfiguration(),
- getQuery(),
- getParameterSetter(),
- getRowMapper(),
- getFetchSize())))
- .setCoder(getCoder())
- .apply(new Reparallelize<>());
+ PCollection<OutputT> output =
+ input
+ .apply(
+ ParDo.of(
+ new ReadFn<>(
+ getDataSourceConfiguration(),
+ getQuery(),
+ getParameterSetter(),
+ getRowMapper(),
+ getFetchSize())))
+ .setCoder(getCoder());
+
+ if (getOutputParallelization()) {
+ output = output.apply(new Reparallelize<>());
+ }
+
+ return output;
}
@Override