pabloem commented on a change in pull request #15863:
URL: https://github.com/apache/beam/pull/15863#discussion_r767088333
##########
File path:
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1708,7 +1777,41 @@ void set(
checkArgument(
spec.getPreparedStatementSetter() != null,
"withPreparedStatementSetter() is required");
}
- return input
+ PCollection<Iterable<T>> iterables;
Review comment:
done. Added a helper method.
##########
File path:
sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOIT.java
##########
@@ -258,6 +263,40 @@ private PipelineResult runRead() {
return pipelineRead.run();
}
+ @Test
+ public void testWriteWithAutosharding() throws Exception {
+ String firstTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
+ DatabaseTestHelper.createTable(dataSource, firstTableName);
+ try {
+ List<KV<Integer, String>> data = getTestDataToWrite(EXPECTED_ROW_COUNT);
+ TestStream.Builder<KV<Integer, String>> ts =
+ TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
+ .advanceWatermarkTo(Instant.now());
+ for (KV<Integer, String> elm : data) {
+ ts.addElements(elm);
+ }
+
+ PCollection<KV<Integer, String>> dataCollection =
+ pipelineWrite.apply(ts.advanceWatermarkToInfinity());
+ dataCollection.apply(
+ JdbcIO.<KV<Integer, String>>write()
+ .withDataSourceProviderFn(voidInput -> dataSource)
+ .withStatement(String.format("insert into %s values(?, ?)
returning *", tableName))
+ .withAutoSharding()
Review comment:
hm not really the way things are now - perhaps analyze the graph and see
that the GIB transform is in it - but is that worth it?
##########
File path:
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
##########
@@ -1549,8 +1570,47 @@ void set(
checkArgument(
(getDataSourceProviderFn() != null),
"withDataSourceConfiguration() or withDataSourceProviderFn() is
required");
-
- return input.apply(
+ checkArgument(
+ getAutoSharding() == null
+ || (getAutoSharding() && input.isBounded() !=
IsBounded.UNBOUNDED),
+ "Autosharding is only supported for streaming pipelines.");
+ ;
+
+ PCollection<Iterable<T>> iterables;
+ if (input.isBounded() == IsBounded.UNBOUNDED
+ && getAutoSharding() != null
+ && getAutoSharding()) {
+ iterables =
+ input
+ .apply(WithKeys.<String, T>of(""))
+ .apply(
+ GroupIntoBatches.<String, T>ofSize(DEFAULT_BATCH_SIZE)
+ .withMaxBufferingDuration(Duration.millis(200))
+ .withShardedKey())
+ .apply(Values.create());
+ } else {
+ iterables =
+ input.apply(
+ ParDo.of(
+ new DoFn<T, Iterable<T>>() {
+ List<T> outputList;
+
+ @ProcessElement
+ public void process(ProcessContext c) {
+ if (outputList == null) {
+ outputList = new ArrayList<>();
+ }
+ outputList.add(c.element());
Review comment:
fixed. Added a maximum row limit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]