mosche commented on a change in pull request #16863:
URL: https://github.com/apache/beam/pull/16863#discussion_r835314536



##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -383,10 +470,25 @@ public void processElement(ProcessContext c) {
       T upperBound = c.element().getValue().getValue();
       JdbcReadWithPartitionsHelper<T> helper =
           
JdbcReadWithPartitionsHelper.getPartitionsHelper(partitioningColumnType);
-      List<KV<T, T>> ranges =
-          Lists.newArrayList(helper.calculateRanges(lowerBound, upperBound, 
c.element().getKey()));
-      LOG.warn("Total of {} ranges: {}", ranges.size(), ranges);
-      for (KV<T, T> e : ranges) {
+      if (c.element().getKey() == 1L) {
+        c.output(
+            new StartEndRange<T>(
+                c.element().getValue().getKey(),

Review comment:
       ```suggestion
                   lowerBound,
   ```

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -383,10 +470,25 @@ public void processElement(ProcessContext c) {
       T upperBound = c.element().getValue().getValue();
       JdbcReadWithPartitionsHelper<T> helper =
           
JdbcReadWithPartitionsHelper.getPartitionsHelper(partitioningColumnType);
-      List<KV<T, T>> ranges =
-          Lists.newArrayList(helper.calculateRanges(lowerBound, upperBound, 
c.element().getKey()));
-      LOG.warn("Total of {} ranges: {}", ranges.size(), ranges);
-      for (KV<T, T> e : ranges) {
+      if (c.element().getKey() == 1L) {
+        c.output(
+            new StartEndRange<T>(
+                c.element().getValue().getKey(),
+                c.element().getValue().getValue(),

Review comment:
       ```suggestion
                   upperBound,
   ```

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -383,10 +470,25 @@ public void processElement(ProcessContext c) {
       T upperBound = c.element().getValue().getValue();
       JdbcReadWithPartitionsHelper<T> helper =
           
JdbcReadWithPartitionsHelper.getPartitionsHelper(partitioningColumnType);
-      List<KV<T, T>> ranges =
-          Lists.newArrayList(helper.calculateRanges(lowerBound, upperBound, 
c.element().getKey()));
-      LOG.warn("Total of {} ranges: {}", ranges.size(), ranges);
-      for (KV<T, T> e : ranges) {
+      if (c.element().getKey() == 1L) {

Review comment:
       Also store `c.element().getKey()` in a local variable to make the code 
easier to follow?

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -429,18 +535,18 @@ public void setParameters(KV<Long, Long> element, 
PreparedStatement preparedStat
             public KV<Long, KV<Long, Long>> mapRow(ResultSet resultSet) throws 
Exception {
               if (resultSet.getMetaData().getColumnCount() == 3) {
                 return KV.of(
-                    resultSet.getLong(3), KV.of(resultSet.getLong(1), 
resultSet.getLong(2)));
+                    resultSet.getLong(3), KV.of(resultSet.getLong(1), 
resultSet.getLong(2) + 1));

Review comment:
       This will fail if the column contains max long. Not sure how likely that 
is, but there's a risk.

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/ReadSDF.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.PreparedStatementSetter;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper;
+import org.apache.beam.sdk.io.jdbc.JdbcUtil.JdbcReadWithPartitionsHelper;
+import org.apache.beam.sdk.io.jdbc.JdbcUtil.StartEndRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link DoFn} executing the SQL query to read from the database. */
+@SuppressWarnings({
+  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@BoundedPerElement
+public class ReadSDF<ParameterT, PartitionT, OutputT> extends DoFn<ParameterT, 
OutputT> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReadSDF.class);
+  private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
+  private final String query;
+  private final PreparedStatementSetter<ParameterT> parameterSetter;
+  private final RowMapper<OutputT> rowMapper;
+  private final Coder<ParameterT> restrictionCoder;
+  private final int fetchSize;
+
+  private DataSource dataSource;
+  private Connection connection;
+
+  ReadSDF(
+      SerializableFunction<Void, DataSource> dataSourceProviderFn,
+      String query,
+      PreparedStatementSetter<ParameterT> parameterSetter,
+      RowMapper<OutputT> rowMapper,
+      int fetchSize,
+      Coder<ParameterT> restrictionCoder) {
+    this.dataSourceProviderFn = dataSourceProviderFn;
+    this.query = query;
+    this.parameterSetter = parameterSetter;
+    this.rowMapper = rowMapper;
+    this.fetchSize = fetchSize;
+    this.restrictionCoder = restrictionCoder;
+  }
+
+  static class ReadRestrictionTracker<ParameterT, PositionT>
+      extends RestrictionTracker<ParameterT, PositionT> {
+    private PositionT lowerBound;
+    private PositionT lastClaimed;
+    private PositionT upperBound;
+    private final JdbcReadWithPartitionsHelper<PositionT> helper;
+    private final TypeDescriptor<PositionT> typeDescriptor;
+    private final String columnName;
+
+    ReadRestrictionTracker(
+        PositionT lowerBound,
+        PositionT upperBound,
+        JdbcReadWithPartitionsHelper<PositionT> helper,
+        TypeDescriptor<PositionT> typeDescriptor,
+        String columnName) {
+      this.lowerBound = lowerBound;
+      this.lastClaimed = lowerBound;
+      this.upperBound = upperBound;
+      this.helper = helper;
+      this.typeDescriptor = typeDescriptor;
+      this.columnName = columnName;
+    }
+
+    @Override
+    public boolean tryClaim(PositionT position) {
+      if (position == null) {
+        return true;
+      } else {
+        // Position is a value that we want to claim.
+        if (upperBound instanceof DateTime && position instanceof 
java.sql.Timestamp) {
+          this.lastClaimed = (PositionT) new DateTime(position);
+          return ((DateTime) upperBound).getMillis() >= new 
DateTime(position).getMillis();
+        } else {
+          assert ((Comparable<PositionT>) position).compareTo(lowerBound) >= 0;
+          this.lastClaimed = position;
+          return ((Comparable<PositionT>) position).compareTo(upperBound) < 0;
+        }
+      }
+    }
+
+    @Override
+    public ParameterT currentRestriction() {
+      if (columnName == null) {
+        return null;
+      } else {
+        // TODO: Validate that ParameterT is `StartEndRange<PositionT>` 
somehow.
+        return (ParameterT)
+            new StartEndRange<PositionT>(lastClaimed, upperBound, 
typeDescriptor, columnName);
+      }
+    }
+
+    @Override
+    public @Nullable SplitResult<ParameterT> trySplit(double 
fractionOfRemainder) {
+      // If the column name is null, then this means that we are not trying to 
partition this
+      // query based on a particular function, so we reject any split requests.
+      if (columnName != null) {
+        List<StartEndRange<PositionT>> ranges =
+            Lists.newArrayList(
+                this.helper.calculateRanges(this.lastClaimed, this.upperBound, 
2L, columnName));
+
+        // TODO: Validate that ParameterT is `StartEndRange<PositionT>` 
somehow.
+        if (ranges.size() > 1) {
+          this.upperBound = ranges.get(0).end();
+          SplitResult<ParameterT> result =
+              SplitResult.<ParameterT>of(
+                  (ParameterT) ranges.get(0),
+                  (ParameterT)
+                      new StartEndRange<PositionT>(
+                          ranges.get(1).start(),
+                          ranges.get(ranges.size() - 1).end(),
+                          ranges.get(0).type(),
+                          ranges.get(0).columnName()));
+          LOG.info(
+              "Splitting range ({},{}),{} into {}",
+              this.lowerBound,
+              this.lastClaimed,
+              this.upperBound,
+              result);

Review comment:
       Not too important, but might confuse people. The split is already 
applied to `upperBound`, this is logging the wrong thing ...

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/ReadSDF.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.PreparedStatementSetter;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper;
+import org.apache.beam.sdk.io.jdbc.JdbcUtil.JdbcReadWithPartitionsHelper;
+import org.apache.beam.sdk.io.jdbc.JdbcUtil.StartEndRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link DoFn} executing the SQL query to read from the database. */
+@SuppressWarnings({
+  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@BoundedPerElement
+public class ReadSDF<ParameterT, PartitionT, OutputT> extends DoFn<ParameterT, 
OutputT> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReadSDF.class);
+  private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
+  private final String query;
+  private final PreparedStatementSetter<ParameterT> parameterSetter;
+  private final RowMapper<OutputT> rowMapper;
+  private final Coder<ParameterT> restrictionCoder;
+  private final int fetchSize;
+
+  private DataSource dataSource;
+  private Connection connection;
+
+  ReadSDF(
+      SerializableFunction<Void, DataSource> dataSourceProviderFn,
+      String query,
+      PreparedStatementSetter<ParameterT> parameterSetter,
+      RowMapper<OutputT> rowMapper,
+      int fetchSize,
+      Coder<ParameterT> restrictionCoder) {
+    this.dataSourceProviderFn = dataSourceProviderFn;
+    this.query = query;
+    this.parameterSetter = parameterSetter;
+    this.rowMapper = rowMapper;
+    this.fetchSize = fetchSize;
+    this.restrictionCoder = restrictionCoder;
+  }
+
+  static class ReadRestrictionTracker<ParameterT, PositionT>
+      extends RestrictionTracker<ParameterT, PositionT> {
+    private PositionT lowerBound;
+    private PositionT lastClaimed;
+    private PositionT upperBound;
+    private final JdbcReadWithPartitionsHelper<PositionT> helper;
+    private final TypeDescriptor<PositionT> typeDescriptor;
+    private final String columnName;
+
+    ReadRestrictionTracker(
+        PositionT lowerBound,
+        PositionT upperBound,
+        JdbcReadWithPartitionsHelper<PositionT> helper,
+        TypeDescriptor<PositionT> typeDescriptor,
+        String columnName) {
+      this.lowerBound = lowerBound;
+      this.lastClaimed = lowerBound;
+      this.upperBound = upperBound;
+      this.helper = helper;
+      this.typeDescriptor = typeDescriptor;
+      this.columnName = columnName;
+    }
+
+    @Override
+    public boolean tryClaim(PositionT position) {
+      if (position == null) {
+        return true;
+      } else {
+        // Position is a value that we want to claim.
+        if (upperBound instanceof DateTime && position instanceof 
java.sql.Timestamp) {
+          this.lastClaimed = (PositionT) new DateTime(position);
+          return ((DateTime) upperBound).getMillis() >= new 
DateTime(position).getMillis();
+        } else {
+          assert ((Comparable<PositionT>) position).compareTo(lowerBound) >= 0;
+          this.lastClaimed = position;
+          return ((Comparable<PositionT>) position).compareTo(upperBound) < 0;
+        }
+      }
+    }
+
+    @Override
+    public ParameterT currentRestriction() {
+      if (columnName == null) {
+        return null;
+      } else {
+        // TODO: Validate that ParameterT is `StartEndRange<PositionT>` 
somehow.
+        return (ParameterT)
+            new StartEndRange<PositionT>(lastClaimed, upperBound, 
typeDescriptor, columnName);
+      }
+    }
+
+    @Override
+    public @Nullable SplitResult<ParameterT> trySplit(double 
fractionOfRemainder) {
+      // If the column name is null, then this means that we are not trying to 
partition this
+      // query based on a particular function, so we reject any split requests.
+      if (columnName != null) {
+        List<StartEndRange<PositionT>> ranges =
+            Lists.newArrayList(
+                this.helper.calculateRanges(this.lastClaimed, this.upperBound, 
2L, columnName));
+
+        // TODO: Validate that ParameterT is `StartEndRange<PositionT>` 
somehow.
+        if (ranges.size() > 1) {
+          this.upperBound = ranges.get(0).end();
+          SplitResult<ParameterT> result =
+              SplitResult.<ParameterT>of(
+                  (ParameterT) ranges.get(0),
+                  (ParameterT)
+                      new StartEndRange<PositionT>(
+                          ranges.get(1).start(),
+                          ranges.get(ranges.size() - 1).end(),
+                          ranges.get(0).type(),
+                          ranges.get(0).columnName()));
+          LOG.info(
+              "Splitting range ({},{}),{} into {}",
+              this.lowerBound,
+              this.lastClaimed,
+              this.upperBound,
+              result);
+          return result;
+        }
+        LOG.info(
+            "Unable to split range ({},{}){}", this.lowerBound, 
this.lastClaimed, this.upperBound);
+      }
+      return null;
+    }
+
+    @Override
+    public void checkDone() throws IllegalStateException {}
+
+    @Override
+    public IsBounded isBounded() {
+      return IsBounded.BOUNDED;
+    }
+  }
+
+  @DoFn.GetInitialRestriction
+  public ParameterT getInitialRestriction(@DoFn.Element ParameterT elm) {
+    return elm;
+  }
+
+  @GetRestrictionCoder
+  public Coder<ParameterT> getRestrictionCoder() {
+    return restrictionCoder;
+  }
+
+  @NewTracker
+  public ReadRestrictionTracker<ParameterT, PartitionT> restrictionTracker(
+      @Restriction ParameterT restriction) {
+    if (restriction instanceof StartEndRange) {
+      StartEndRange<PartitionT> rangeRestriction = (StartEndRange<PartitionT>) 
restriction;
+      return new ReadRestrictionTracker<ParameterT, PartitionT>(
+          rangeRestriction.start(),
+          rangeRestriction.end(),
+          (JdbcReadWithPartitionsHelper<PartitionT>)
+              
JdbcUtil.PRESET_HELPERS.get(rangeRestriction.type().getRawType()),
+          rangeRestriction.type(),
+          ((StartEndRange<?>) restriction).columnName());
+    } else {
+      return new ReadRestrictionTracker<ParameterT, PartitionT>(null, null, 
null, null, null);
+    }
+  }
+
+  @Setup
+  public void setup() throws Exception {
+    dataSource = dataSourceProviderFn.apply(null);
+  }
+
+  @ProcessElement
+  // Spotbugs seems to not understand the nested try-with-resources
+  @SuppressFBWarnings("OBL_UNSATISFIED_OBLIGATION")
+  public void processElement(
+      @Element ParameterT range,
+      RestrictionTracker<ParameterT, PartitionT> tracker,
+      OutputReceiver<OutputT> outputReceiver)
+      throws Exception {
+    // Only acquire the connection if we need to perform a read.
+    if (connection == null) {
+      connection = dataSource.getConnection();
+    }
+    // PostgreSQL requires autocommit to be disabled to enable cursor streaming
+    // see 
https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
+    LOG.info("Autocommit has been disabled");
+    connection.setAutoCommit(false);
+    try (PreparedStatement statement =
+        connection.prepareStatement(
+            query, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+      statement.setFetchSize(fetchSize);
+      // This DoFn can be called with non-splittable parameters, or with 
splittable parameters.
+      // If the parameters are splittable, then the restriction will not be 
null, and the parameters
+      // will be part of the restriction (therefore 
tracker.currentRestriction() != null).
+      // A "splittable parameter" wil represent a StartEndRange produced within
+      // JdbcIO.ReadWithPartitions.
+      // If the parameters are non-splittable (i.e. a normal, user-provided 
parameter from
+      // JdbcIO.Read or JdbcIO.ReadVoid.
+      if (tracker.currentRestriction() != null) {
+        parameterSetter.setParameters(tracker.currentRestriction(), statement);
+      } else {
+        parameterSetter.setParameters(range, statement);
+      }
+      try (ResultSet resultSet = statement.executeQuery()) {
+        while (resultSet.next()) {
+          Object position = null;
+          if (range instanceof StartEndRange) {
+            position = resultSet.getObject(((StartEndRange<?>) 
range).columnName());
+          }
+          // For null positions (i.e. a non-splittable parameter), the 
restriction tracker will
+          // always allow us to claim said position, while for non-null 
positions, we may have
+          // performed a split.
+          if (tracker.tryClaim((PartitionT) position)) {
+            outputReceiver.output(rowMapper.mapRow(resultSet));
+          } else {
+            // We have arrived to the end of the restriction, and we must end 
the query
+            return;

Review comment:
       Wondering, don't you have to return `DoFn.ProcessContinuation.stop`?
   
   > If this succeeds, the DoFn MUST execute the entire block of work. If this 
fails:
   DoFn.ProcessElement MUST return DoFn.ProcessContinuation.stop without 
performing any additional work or emitting output (note that emitting output or 
performing work from DoFn

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/ReadSDF.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.PreparedStatementSetter;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper;
+import org.apache.beam.sdk.io.jdbc.JdbcUtil.JdbcReadWithPartitionsHelper;
+import org.apache.beam.sdk.io.jdbc.JdbcUtil.StartEndRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link DoFn} executing the SQL query to read from the database. */
+@SuppressWarnings({
+  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@BoundedPerElement
+public class ReadSDF<ParameterT, PartitionT, OutputT> extends DoFn<ParameterT, 
OutputT> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReadSDF.class);
+  private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
+  private final String query;
+  private final PreparedStatementSetter<ParameterT> parameterSetter;
+  private final RowMapper<OutputT> rowMapper;
+  private final Coder<ParameterT> restrictionCoder;
+  private final int fetchSize;
+
+  private DataSource dataSource;
+  private Connection connection;
+
+  ReadSDF(
+      SerializableFunction<Void, DataSource> dataSourceProviderFn,
+      String query,
+      PreparedStatementSetter<ParameterT> parameterSetter,
+      RowMapper<OutputT> rowMapper,
+      int fetchSize,
+      Coder<ParameterT> restrictionCoder) {
+    this.dataSourceProviderFn = dataSourceProviderFn;
+    this.query = query;
+    this.parameterSetter = parameterSetter;
+    this.rowMapper = rowMapper;
+    this.fetchSize = fetchSize;
+    this.restrictionCoder = restrictionCoder;
+  }
+
+  static class ReadRestrictionTracker<ParameterT, PositionT>
+      extends RestrictionTracker<ParameterT, PositionT> {
+    private PositionT lowerBound;
+    private PositionT lastClaimed;
+    private PositionT upperBound;
+    private final JdbcReadWithPartitionsHelper<PositionT> helper;
+    private final TypeDescriptor<PositionT> typeDescriptor;
+    private final String columnName;
+
+    ReadRestrictionTracker(
+        PositionT lowerBound,
+        PositionT upperBound,
+        JdbcReadWithPartitionsHelper<PositionT> helper,
+        TypeDescriptor<PositionT> typeDescriptor,
+        String columnName) {
+      this.lowerBound = lowerBound;
+      this.lastClaimed = lowerBound;
+      this.upperBound = upperBound;
+      this.helper = helper;
+      this.typeDescriptor = typeDescriptor;
+      this.columnName = columnName;
+    }
+
+    @Override
+    public boolean tryClaim(PositionT position) {
+      if (position == null) {
+        return true;
+      } else {
+        // Position is a value that we want to claim.
+        if (upperBound instanceof DateTime && position instanceof 
java.sql.Timestamp) {
+          this.lastClaimed = (PositionT) new DateTime(position);
+          return ((DateTime) upperBound).getMillis() >= new 
DateTime(position).getMillis();
+        } else {
+          assert ((Comparable<PositionT>) position).compareTo(lowerBound) >= 0;
+          this.lastClaimed = position;
+          return ((Comparable<PositionT>) position).compareTo(upperBound) < 0;
+        }
+      }
+    }
+
+    @Override
+    public ParameterT currentRestriction() {
+      if (columnName == null) {
+        return null;
+      } else {
+        // TODO: Validate that ParameterT is `StartEndRange<PositionT>` 
somehow.

Review comment:
       Why not define `ReadRestrictionTracker` as follows to avoid all the 
casting?
   ```java
   static class ReadRestrictionTracker<PositionT>
         extends RestrictionTracker<StartEndRange<PositionT>, PositionT>{
   }
   ```

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/ReadSDF.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.PreparedStatementSetter;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper;
+import org.apache.beam.sdk.io.jdbc.JdbcUtil.JdbcReadWithPartitionsHelper;
+import org.apache.beam.sdk.io.jdbc.JdbcUtil.StartEndRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link DoFn} executing the SQL query to read from the database. */
+@SuppressWarnings({
+  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+@BoundedPerElement
+public class ReadSDF<ParameterT, PartitionT, OutputT> extends DoFn<ParameterT, 
OutputT> {
+  private static final Logger LOG = LoggerFactory.getLogger(ReadSDF.class);
+  private final SerializableFunction<Void, DataSource> dataSourceProviderFn;
+  private final String query;
+  private final PreparedStatementSetter<ParameterT> parameterSetter;
+  private final RowMapper<OutputT> rowMapper;
+  private final Coder<ParameterT> restrictionCoder;
+  private final int fetchSize;
+
+  private DataSource dataSource;
+  private Connection connection;
+
+  ReadSDF(
+      SerializableFunction<Void, DataSource> dataSourceProviderFn,
+      String query,
+      PreparedStatementSetter<ParameterT> parameterSetter,
+      RowMapper<OutputT> rowMapper,
+      int fetchSize,
+      Coder<ParameterT> restrictionCoder) {
+    this.dataSourceProviderFn = dataSourceProviderFn;
+    this.query = query;
+    this.parameterSetter = parameterSetter;
+    this.rowMapper = rowMapper;
+    this.fetchSize = fetchSize;
+    this.restrictionCoder = restrictionCoder;
+  }
+
+  static class ReadRestrictionTracker<ParameterT, PositionT>
+      extends RestrictionTracker<ParameterT, PositionT> {
+    private PositionT lowerBound;
+    private PositionT lastClaimed;
+    private PositionT upperBound;
+    private final JdbcReadWithPartitionsHelper<PositionT> helper;
+    private final TypeDescriptor<PositionT> typeDescriptor;
+    private final String columnName;
+
+    ReadRestrictionTracker(
+        PositionT lowerBound,
+        PositionT upperBound,
+        JdbcReadWithPartitionsHelper<PositionT> helper,
+        TypeDescriptor<PositionT> typeDescriptor,
+        String columnName) {
+      this.lowerBound = lowerBound;
+      this.lastClaimed = lowerBound;
+      this.upperBound = upperBound;
+      this.helper = helper;
+      this.typeDescriptor = typeDescriptor;
+      this.columnName = columnName;
+    }
+
+    @Override
+    public boolean tryClaim(PositionT position) {
+      if (position == null) {
+        return true;
+      } else {
+        // Position is a value that we want to claim.
+        if (upperBound instanceof DateTime && position instanceof 
java.sql.Timestamp) {
+          this.lastClaimed = (PositionT) new DateTime(position);
+          return ((DateTime) upperBound).getMillis() >= new 
DateTime(position).getMillis();
+        } else {
+          assert ((Comparable<PositionT>) position).compareTo(lowerBound) >= 0;

Review comment:
       This could even compare against `lastClaimed`, right?

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -358,23 +360,108 @@ private static Calendar 
withTimestampAndTimezone(DateTime dateTime) {
       return (JdbcReadWithPartitionsHelper<T>) 
PRESET_HELPERS.get(type.getRawType());
     }
 
-    Iterable<KV<PartitionT, PartitionT>> calculateRanges(
-        PartitionT lowerBound, PartitionT upperBound, Long partitions);
+    Iterable<StartEndRange<PartitionT>> calculateRanges(
+        PartitionT lowerBound, PartitionT upperBound, Long partitions, String 
columnName);
 
     @Override
-    void setParameters(KV<PartitionT, PartitionT> element, PreparedStatement 
preparedStatement);
+    void setParameters(StartEndRange<PartitionT> element, PreparedStatement 
preparedStatement);
 
     @Override
     KV<Long, KV<PartitionT, PartitionT>> mapRow(ResultSet resultSet) throws 
Exception;
   }
 
+  public static class StartEndRange<T> implements Serializable {
+    private final T start;
+    private final T end;
+    private final TypeDescriptor<T> typeDescriptor;
+    private final String columnName;
+
+    StartEndRange(T start, T end, TypeDescriptor<T> typeDescriptor, String 
columnName) {
+      this.start = start;
+      this.end = end;
+      this.typeDescriptor = typeDescriptor;
+      this.columnName = columnName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (!(o instanceof StartEndRange)) {
+        return false;
+      }
+      StartEndRange<?> that = (StartEndRange<?>) o;
+      return Objects.equals(start, that.start)
+          && Objects.equals(end, that.end)
+          && Objects.equals(typeDescriptor, that.typeDescriptor)
+          && Objects.equals(columnName, that.columnName);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(start, end, typeDescriptor, columnName);
+    }
+
+    @Override
+    public String toString() {
+      return "StartEndRange{"
+          + "start="
+          + start
+          + ", end="
+          + end
+          + ", typeDescriptor="
+          + typeDescriptor
+          + ", columnName='"
+          + columnName
+          + '\''
+          + '}';
+    }
+
+    T start() {
+      return start;
+    }
+
+    T end() {
+      return end;
+    }
+
+    String columnName() {
+      return columnName;
+    }
+
+    TypeDescriptor<T> type() {
+      return typeDescriptor;
+    }
+
+    KV<StartEndRange<T>, StartEndRange<T>> split(double fraction) {

Review comment:
       Not used as far as i can see

##########
File path: 
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/ReadSDF.java
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.jdbc;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.List;
+import javax.sql.DataSource;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.PreparedStatementSetter;
+import org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper;
+import org.apache.beam.sdk.io.jdbc.JdbcUtil.JdbcReadWithPartitionsHelper;
+import org.apache.beam.sdk.io.jdbc.JdbcUtil.StartEndRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link DoFn} executing the SQL query to read from the database. */
+@SuppressWarnings({
+  "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})

Review comment:
       Copy paste? Probably something that should be removed for new code ...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to