This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 35eafba  [BEAM-8897] Fix reading Rows with Numeric fields from a JDBC 
data source
     new 5556871  Merge pull request #14918 from [BEAM-8897] Fix reading Rows 
with Numeric fields from a JDBC data source
35eafba is described below

commit 35eafba53f4acf8ccde333d3d8b61f2e89bb850f
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Tue Jun 1 18:53:13 2021 +0300

    [BEAM-8897] Fix reading Rows with Numeric fields from a JDBC data source
---
 .../org/apache/beam/sdk/io/jdbc/SchemaUtil.java    | 12 +++++++-
 .../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java    | 35 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java
index 921fe82..0cc7c26 100644
--- 
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java
+++ 
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java
@@ -53,6 +53,7 @@ import java.util.stream.IntStream;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.joda.time.DateTime;
@@ -197,10 +198,19 @@ class SchemaUtil {
     };
   }
 
-  /** Converts numeric fields with specified precision and scale. */
+  /**
+   * Converts numeric fields with specified precision and scale to {@link
+   * LogicalTypes.FixedPrecisionNumeric}. If a precision of numeric field is 
not specified, then
+   * converts such field to {@link FieldType#DECIMAL}.
+   */
   private static BeamFieldConverter beamLogicalNumericField(String identifier) 
{
     return (index, md) -> {
       int precision = md.getPrecision(index);
+      if (precision == Integer.MAX_VALUE || precision == -1) {
+        // If a precision is not specified, the column stores values as given 
(e.g. in Oracle DB)
+        return Schema.Field.of(md.getColumnLabel(index), FieldType.DECIMAL)
+            .withNullable(md.isNullable(index) == 
ResultSetMetaData.columnNullable);
+      }
       int scale = md.getScale(index);
       Schema.FieldType fieldType =
           Schema.FieldType.logicalType(
diff --git 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 7754e2e..c0607d3 100644
--- 
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++ 
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.jdbc;
 
+import static java.sql.JDBCType.NUMERIC;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
@@ -62,7 +63,9 @@ import org.apache.beam.sdk.io.common.DatabaseTestHelper;
 import org.apache.beam.sdk.io.common.TestRow;
 import org.apache.beam.sdk.io.jdbc.JdbcIO.DataSourceConfiguration;
 import org.apache.beam.sdk.io.jdbc.JdbcIO.PoolableDataSourceProvider;
+import org.apache.beam.sdk.io.jdbc.LogicalTypes.FixedPrecisionNumeric;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.transforms.Select;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.PAssert;
@@ -274,6 +277,38 @@ public class JdbcIOTest implements Serializable {
   }
 
   @Test
+  public void testReadRowsWithNumericFields() {
+    PCollection<Row> rows =
+        pipeline.apply(
+            JdbcIO.readRows()
+                .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
+                .withQuery(
+                    String.format(
+                        "SELECT CAST(1 AS NUMERIC(1, 0)) AS T1 FROM %s WHERE 
name = ?",
+                        READ_TABLE_NAME))
+                .withStatementPreparator(
+                    preparedStatement ->
+                        preparedStatement.setString(1, 
TestRow.getNameForSeed(1))));
+
+    Schema expectedSchema =
+        Schema.of(
+            Schema.Field.of(
+                "T1",
+                
FieldType.logicalType(FixedPrecisionNumeric.of(NUMERIC.getName(), 1, 0))
+                    .withNullable(false)));
+
+    assertEquals(expectedSchema, rows.getSchema());
+
+    PCollection<Row> output = rows.apply(Select.fieldNames("T1"));
+    PAssert.that(output)
+        .containsInAnyOrder(
+            ImmutableList.of(
+                
Row.withSchema(expectedSchema).addValues(BigDecimal.valueOf(1)).build()));
+
+    pipeline.run();
+  }
+
+  @Test
   public void testReadRowsWithoutStatementPreparator() {
     SerializableFunction<Void, DataSource> dataSourceProvider = ignored -> 
DATA_SOURCE;
     String name = TestRow.getNameForSeed(1);

Reply via email to