This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 35eafba [BEAM-8897] Fix reading Rows with Numeric fields from a JDBC
data source
new 5556871 Merge pull request #14918 from [BEAM-8897] Fix reading Rows
with Numeric fields from a JDBC data source
35eafba is described below
commit 35eafba53f4acf8ccde333d3d8b61f2e89bb850f
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Tue Jun 1 18:53:13 2021 +0300
[BEAM-8897] Fix reading Rows with Numeric fields from a JDBC data source
---
.../org/apache/beam/sdk/io/jdbc/SchemaUtil.java | 12 +++++++-
.../org/apache/beam/sdk/io/jdbc/JdbcIOTest.java | 35 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java
index 921fe82..0cc7c26 100644
---
a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java
+++
b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/SchemaUtil.java
@@ -53,6 +53,7 @@ import java.util.stream.IntStream;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.values.Row;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.joda.time.DateTime;
@@ -197,10 +198,19 @@ class SchemaUtil {
};
}
- /** Converts numeric fields with specified precision and scale. */
+ /**
+ * Converts numeric fields with specified precision and scale to {@link
+ * LogicalTypes.FixedPrecisionNumeric}. If a precision of numeric field is
not specified, then
+ * converts such field to {@link FieldType#DECIMAL}.
+ */
private static BeamFieldConverter beamLogicalNumericField(String identifier)
{
return (index, md) -> {
int precision = md.getPrecision(index);
+ if (precision == Integer.MAX_VALUE || precision == -1) {
+ // If a precision is not specified, the column stores values as given
(e.g. in Oracle DB)
+ return Schema.Field.of(md.getColumnLabel(index), FieldType.DECIMAL)
+ .withNullable(md.isNullable(index) ==
ResultSetMetaData.columnNullable);
+ }
int scale = md.getScale(index);
Schema.FieldType fieldType =
Schema.FieldType.logicalType(
diff --git
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
index 7754e2e..c0607d3 100644
---
a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
+++
b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/JdbcIOTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.jdbc;
+import static java.sql.JDBCType.NUMERIC;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
@@ -62,7 +63,9 @@ import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.jdbc.JdbcIO.DataSourceConfiguration;
import org.apache.beam.sdk.io.jdbc.JdbcIO.PoolableDataSourceProvider;
+import org.apache.beam.sdk.io.jdbc.LogicalTypes.FixedPrecisionNumeric;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
@@ -274,6 +277,38 @@ public class JdbcIOTest implements Serializable {
}
@Test
+ public void testReadRowsWithNumericFields() {
+ PCollection<Row> rows =
+ pipeline.apply(
+ JdbcIO.readRows()
+ .withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION)
+ .withQuery(
+ String.format(
+ "SELECT CAST(1 AS NUMERIC(1, 0)) AS T1 FROM %s WHERE
name = ?",
+ READ_TABLE_NAME))
+ .withStatementPreparator(
+ preparedStatement ->
+ preparedStatement.setString(1,
TestRow.getNameForSeed(1))));
+
+ Schema expectedSchema =
+ Schema.of(
+ Schema.Field.of(
+ "T1",
+
FieldType.logicalType(FixedPrecisionNumeric.of(NUMERIC.getName(), 1, 0))
+ .withNullable(false)));
+
+ assertEquals(expectedSchema, rows.getSchema());
+
+ PCollection<Row> output = rows.apply(Select.fieldNames("T1"));
+ PAssert.that(output)
+ .containsInAnyOrder(
+ ImmutableList.of(
+
Row.withSchema(expectedSchema).addValues(BigDecimal.valueOf(1)).build()));
+
+ pipeline.run();
+ }
+
+ @Test
public void testReadRowsWithoutStatementPreparator() {
SerializableFunction<Void, DataSource> dataSourceProvider = ignored ->
DATA_SOURCE;
String name = TestRow.getNameForSeed(1);