This is an automated email from the ASF dual-hosted git repository.

libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9538fdaab29 [FLINK-31741][jdbc-driver] Support data converter for 
value in statement result
9538fdaab29 is described below

commit 9538fdaab2948a2e3dd068925d936ac0777301de
Author: shammon FY <[email protected]>
AuthorDate: Thu Apr 6 11:34:12 2023 +0800

    [FLINK-31741][jdbc-driver] Support data converter for value in statement 
result
    
    Close apache/flink#22360
---
 .../apache/flink/table/jdbc/FlinkResultSet.java    |  35 +--
 .../flink/table/jdbc/utils/DataConverter.java      |  88 ++++++++
 .../table/jdbc/utils/DefaultDataConverter.java     | 105 +++++++++
 .../table/jdbc/utils/StringDataConverter.java      | 105 +++++++++
 .../flink/table/jdbc/FlinkResultSetTest.java       | 238 +++++++++++++--------
 5 files changed, 463 insertions(+), 108 deletions(-)

diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
index cfb7ebc3f09..6c8072b60ba 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
@@ -21,7 +21,7 @@ package org.apache.flink.table.jdbc;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.client.gateway.StatementResult;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.jdbc.utils.DataConverter;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.DecimalType;
 
@@ -50,14 +50,17 @@ public class FlinkResultSet extends BaseResultSet {
     private final List<String> columnNameList;
     private final Statement statement;
     private final StatementResult result;
+    private final DataConverter dataConverter;
     private RowData currentRow;
     private boolean wasNull;
 
     private volatile boolean closed;
 
-    public FlinkResultSet(Statement statement, StatementResult result) {
+    public FlinkResultSet(
+            Statement statement, StatementResult result, DataConverter 
dataConverter) {
         this.statement = checkNotNull(statement, "Statement cannot be null");
         this.result = checkNotNull(result, "Statement result cannot be null");
+        this.dataConverter = checkNotNull(dataConverter, "Data converter 
cannot be null");
         this.currentRow = null;
         this.wasNull = false;
 
@@ -133,9 +136,8 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
 
-        StringData stringData = currentRow.getString(columnIndex - 1);
         try {
-            return stringData == null ? null : stringData.toString();
+            return dataConverter.getString(currentRow, columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -147,8 +149,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return currentRow.getBoolean(columnIndex - 1);
-
+            return dataConverter.getBoolean(currentRow, columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -160,7 +161,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return currentRow.getByte(columnIndex - 1);
+            return dataConverter.getByte(currentRow, columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -172,7 +173,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return currentRow.getShort(columnIndex - 1);
+            return dataConverter.getShort(currentRow, columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -184,7 +185,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return currentRow.getInt(columnIndex - 1);
+            return dataConverter.getInt(currentRow, columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -197,7 +198,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidColumn(columnIndex);
 
         try {
-            return currentRow.getLong(columnIndex - 1);
+            return dataConverter.getLong(currentRow, columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -209,7 +210,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return currentRow.getFloat(columnIndex - 1);
+            return dataConverter.getFloat(currentRow, columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -221,7 +222,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return currentRow.getDouble(columnIndex - 1);
+            return dataConverter.getDouble(currentRow, columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -238,7 +239,7 @@ public class FlinkResultSet extends BaseResultSet {
         checkValidRow();
         checkValidColumn(columnIndex);
         try {
-            return currentRow.getBinary(columnIndex - 1);
+            return dataConverter.getBinary(currentRow, columnIndex - 1);
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
@@ -373,9 +374,11 @@ public class FlinkResultSet extends BaseResultSet {
         }
         DecimalType decimalType = (DecimalType) dataType.getLogicalType();
         try {
-            return currentRow
-                    .getDecimal(columnIndex - 1, decimalType.getPrecision(), 
decimalType.getScale())
-                    .toBigDecimal();
+            return dataConverter.getDecimal(
+                    currentRow,
+                    columnIndex - 1,
+                    decimalType.getPrecision(),
+                    decimalType.getScale());
         } catch (Exception e) {
             throw new SQLDataException(e);
         }
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java
new file mode 100644
index 00000000000..1709932c8c1
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Timestamp;
+import java.util.Map;
+
+/** Convert data from row data for result set. */
+public interface DataConverter {
+
+    /** Returns the boolean value at the given position. */
+    boolean getBoolean(RowData rowData, int pos);
+
+    /** Returns the byte value at the given position. */
+    byte getByte(RowData rowData, int pos);
+
+    /** Returns the short value at the given position. */
+    short getShort(RowData rowData, int pos);
+
+    /** Returns the integer value at the given position. */
+    int getInt(RowData rowData, int pos);
+
+    /** Returns the long value at the given position. */
+    long getLong(RowData rowData, int pos);
+
+    /** Returns the float value at the given position. */
+    float getFloat(RowData rowData, int pos);
+
+    /** Returns the double value at the given position. */
+    double getDouble(RowData rowData, int pos);
+
+    /** Returns the string value at the given position. */
+    String getString(RowData rowData, int pos);
+
+    /**
+     * Returns the decimal value at the given position.
+     *
+     * <p>The precision and scale are required to determine whether the 
decimal value was stored in
+     * a compact representation (see {@link DecimalData}).
+     */
+    BigDecimal getDecimal(RowData rowData, int pos, int precision, int scale);
+
+    /**
+     * Returns the timestamp value at the given position.
+     *
+     * <p>The precision is required to determine whether the timestamp value 
was stored in a compact
+     * representation (see {@link TimestampData}).
+     */
+    Timestamp getTimestamp(RowData rowData, int pos, int precision);
+
+    /** Returns the binary value at the given position. */
+    byte[] getBinary(RowData rowData, int pos);
+
+    /** Returns the array value at the given position. */
+    Array getArray(RowData rowData, int pos);
+
+    /** Returns the map value at the given position. */
+    Map<?, ?> getMap(RowData rowData, int pos);
+
+    /**
+     * Returns the row value at the given position.
+     *
+     * <p>The number of fields is required to correctly extract the row.
+     */
+    RowData getRow(RowData rowData, int pos, int numFields);
+}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java
new file mode 100644
index 00000000000..c5b65092df2
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.data.RowData;
+
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Timestamp;
+import java.util.Map;
+
+/** Default data converter for result set. */
+public class DefaultDataConverter implements DataConverter {
+    public static final DataConverter CONVERTER = new DefaultDataConverter();
+
+    private DefaultDataConverter() {}
+
+    @Override
+    public boolean getBoolean(RowData rowData, int pos) {
+        return !rowData.isNullAt(pos) && rowData.getBoolean(pos);
+    }
+
+    @Override
+    public byte getByte(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : rowData.getByte(pos);
+    }
+
+    @Override
+    public short getShort(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : rowData.getShort(pos);
+    }
+
+    @Override
+    public int getInt(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : rowData.getInt(pos);
+    }
+
+    @Override
+    public long getLong(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : rowData.getLong(pos);
+    }
+
+    @Override
+    public float getFloat(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : rowData.getFloat(pos);
+    }
+
+    @Override
+    public double getDouble(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : rowData.getDouble(pos);
+    }
+
+    @Override
+    public String getString(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? null : 
rowData.getString(pos).toString();
+    }
+
+    @Override
+    public BigDecimal getDecimal(RowData rowData, int pos, int precision, int 
scale) {
+        return rowData.isNullAt(pos)
+                ? null
+                : rowData.getDecimal(pos, precision, scale).toBigDecimal();
+    }
+
+    @Override
+    public Timestamp getTimestamp(RowData rowData, int pos, int precision) {
+        return rowData.isNullAt(pos) ? null : rowData.getTimestamp(pos, 
precision).toTimestamp();
+    }
+
+    @Override
+    public byte[] getBinary(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? null : rowData.getBinary(pos);
+    }
+
+    @Override
+    public Array getArray(RowData rowData, int pos) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<?, ?> getMap(RowData rowData, int pos) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RowData getRow(RowData rowData, int pos, int numFields) {
+        return rowData.getRow(pos, numFields);
+    }
+}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java
 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java
new file mode 100644
index 00000000000..cd8e8e0f1a3
--- /dev/null
+++ 
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.jdbc.utils;
+
+import org.apache.flink.table.data.RowData;
+
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Timestamp;
+import java.util.Map;
+
+/** Converter string value to different value. */
+public class StringDataConverter implements DataConverter {
+    public static final DataConverter CONVERTER = new StringDataConverter();
+
+    private StringDataConverter() {}
+
+    @Override
+    public boolean getBoolean(RowData rowData, int pos) {
+        return Boolean.parseBoolean(getString(rowData, pos));
+    }
+
+    @Override
+    public byte getByte(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : Byte.parseByte(getString(rowData, 
pos));
+    }
+
+    @Override
+    public short getShort(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : Short.parseShort(getString(rowData, 
pos));
+    }
+
+    @Override
+    public int getInt(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : Integer.parseInt(getString(rowData, 
pos));
+    }
+
+    @Override
+    public long getLong(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : Long.parseLong(getString(rowData, 
pos));
+    }
+
+    @Override
+    public float getFloat(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : Float.parseFloat(getString(rowData, 
pos));
+    }
+
+    @Override
+    public double getDouble(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? 0 : 
Double.parseDouble(getString(rowData, pos));
+    }
+
+    @Override
+    public String getString(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? null : 
rowData.getString(pos).toString();
+    }
+
+    @Override
+    public BigDecimal getDecimal(RowData rowData, int pos, int precision, int 
scale) {
+        return rowData.isNullAt(pos)
+                ? null
+                : new BigDecimal(getString(rowData, pos)).setScale(scale);
+    }
+
+    @Override
+    public byte[] getBinary(RowData rowData, int pos) {
+        return rowData.isNullAt(pos) ? null : rowData.getString(pos).toBytes();
+    }
+
+    @Override
+    public Timestamp getTimestamp(RowData rowData, int pos, int precision) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Array getArray(RowData rowData, int pos) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<?, ?> getMap(RowData rowData, int pos) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RowData getRow(RowData rowData, int pos, int numFields) {
+        throw new UnsupportedOperationException();
+    }
+}
diff --git 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
index 2e51cf247d6..6c68885d18d 100644
--- 
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
+++ 
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
@@ -28,6 +28,8 @@ import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.jdbc.utils.DefaultDataConverter;
+import org.apache.flink.table.jdbc.utils.StringDataConverter;
 import org.apache.flink.util.CloseableIterator;
 
 import org.junit.jupiter.api.Test;
@@ -36,15 +38,31 @@ import java.math.BigDecimal;
 import java.sql.ResultSet;
 import java.sql.SQLDataException;
 import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrowsExactly;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for {@link FlinkResultSet}. */
 public class FlinkResultSetTest {
     private static final int RECORD_SIZE = 5000;
+    private static final ResolvedSchema SCHEMA =
+            ResolvedSchema.of(
+                    Column.physical("v1", DataTypes.BOOLEAN()),
+                    Column.physical("v2", DataTypes.TINYINT()),
+                    Column.physical("v3", DataTypes.SMALLINT()),
+                    Column.physical("v4", DataTypes.INT()),
+                    Column.physical("v5", DataTypes.BIGINT()),
+                    Column.physical("v6", DataTypes.FLOAT()),
+                    Column.physical("v7", DataTypes.DOUBLE()),
+                    Column.physical("v8", DataTypes.DECIMAL(10, 5)),
+                    Column.physical("v9", DataTypes.STRING()),
+                    Column.physical("v10", DataTypes.BYTES()));
 
     @Test
     public void testResultSetPrimitiveData() throws Exception {
@@ -71,105 +89,141 @@ public class FlinkResultSetTest {
                                                                 
StringData.fromString(v.toString()),
                                                                 
v.toString().getBytes()))
                                 .iterator());
-        int resultCount = 0;
         try (ResultSet resultSet =
                 new FlinkResultSet(
                         new TestingStatement(),
                         new StatementResult(
-                                ResolvedSchema.of(
-                                        Column.physical("v1", 
DataTypes.BOOLEAN()),
-                                        Column.physical("v2", 
DataTypes.TINYINT()),
-                                        Column.physical("v3", 
DataTypes.SMALLINT()),
-                                        Column.physical("v4", DataTypes.INT()),
-                                        Column.physical("v5", 
DataTypes.BIGINT()),
-                                        Column.physical("v6", 
DataTypes.FLOAT()),
-                                        Column.physical("v7", 
DataTypes.DOUBLE()),
-                                        Column.physical("v8", 
DataTypes.DECIMAL(10, 5)),
-                                        Column.physical("v9", 
DataTypes.STRING()),
-                                        Column.physical("v10", 
DataTypes.BYTES())),
-                                data,
-                                true,
-                                ResultKind.SUCCESS,
-                                JobID.generate()))) {
-            while (resultSet.next()) {
-                Integer val = resultSet.getInt("v4");
-                assertEquals(val, resultCount);
-                resultCount++;
+                                SCHEMA, data, true, ResultKind.SUCCESS, 
JobID.generate()),
+                        DefaultDataConverter.CONVERTER)) {
+            validateResultData(resultSet);
+        }
+    }
+
+    @Test
+    public void testStringResultSetPrimitiveData() throws Exception {
+        CloseableIterator<RowData> data =
+                CloseableIterator.adapterForIterator(
+                        IntStream.range(0, RECORD_SIZE)
+                                .boxed()
+                                .map(
+                                        v ->
+                                                stringRowData(
+                                                        v % 2 == 0,
+                                                        v.byteValue(),
+                                                        v.shortValue(),
+                                                        v,
+                                                        v.longValue(),
+                                                        (float) (v + 0.1),
+                                                        v + 0.22,
+                                                        
DecimalData.fromBigDecimal(
+                                                                new 
BigDecimal(v + ".55555"),
+                                                                10,
+                                                                5),
+                                                        
StringData.fromString(v.toString()),
+                                                        v.toString()))
+                                .iterator());
+        try (ResultSet resultSet =
+                new FlinkResultSet(
+                        new TestingStatement(),
+                        new StatementResult(
+                                SCHEMA, data, true, ResultKind.SUCCESS, 
JobID.generate()),
+                        StringDataConverter.CONVERTER)) {
+            validateResultData(resultSet);
+        }
+    }
+
+    @Test
+    public void testStringResultSetNullData() throws Exception {
+        CloseableIterator<RowData> data =
+                CloseableIterator.adapterForIterator(
+                        Collections.singletonList(
+                                        (RowData)
+                                                GenericRowData.of(
+                                                        null, null, null, 
null, null, null, null,
+                                                        null, null, null))
+                                .iterator());
+        try (ResultSet resultSet =
+                new FlinkResultSet(
+                        new TestingStatement(),
+                        new StatementResult(
+                                SCHEMA, data, true, ResultKind.SUCCESS, 
JobID.generate()),
+                        StringDataConverter.CONVERTER)) {
+            assertTrue(resultSet.next());
+            assertFalse(resultSet.getBoolean(1));
+            assertEquals((byte) 0, resultSet.getByte(2));
+            assertEquals((short) 0, resultSet.getShort(3));
+            assertEquals(0, resultSet.getInt(4));
+            assertEquals(0L, resultSet.getLong(5));
+            assertEquals((float) 0.0, resultSet.getFloat(6));
+            assertEquals(0.0, resultSet.getDouble(7));
+            assertNull(resultSet.getBigDecimal(8));
+            assertNull(resultSet.getString(9));
+            assertNull(resultSet.getBytes(10));
+            assertFalse(resultSet.next());
+        }
+    }
+
+    private RowData stringRowData(Object... values) {
+        return GenericRowData.of(
+                Arrays.stream(values).map(v -> 
StringData.fromString(v.toString())).toArray());
+    }
+
+    private static void validateResultData(ResultSet resultSet) throws 
SQLException {
+        int resultCount = 0;
+        while (resultSet.next()) {
+            Integer val = resultSet.getInt("v4");
+            assertEquals(val, resultCount);
+            resultCount++;
 
-                // Get and validate each column value
-                assertEquals(val % 2 == 0, resultSet.getBoolean(1));
-                assertEquals(val % 2 == 0, resultSet.getBoolean("v1"));
-                assertEquals(val.byteValue(), resultSet.getByte(2));
-                assertEquals(val.byteValue(), resultSet.getByte("v2"));
-                assertEquals(val.shortValue(), resultSet.getShort(3));
-                assertEquals(val.shortValue(), resultSet.getShort("v3"));
-                assertEquals(val, resultSet.getInt(4));
-                assertEquals(val, resultSet.getInt("v4"));
-                assertEquals(val.longValue(), resultSet.getLong(5));
-                assertEquals(val.longValue(), resultSet.getLong("v5"));
-                assertTrue(resultSet.getFloat(6) - val - 0.1 < 0.0001);
-                assertTrue(resultSet.getFloat("v6") - val - 0.1 < 0.0001);
-                assertTrue(resultSet.getDouble(7) - val - 0.22 < 0.0001);
-                assertTrue(resultSet.getDouble("v7") - val - 0.22 < 0.0001);
-                assertEquals(new BigDecimal(val + ".55555"), 
resultSet.getBigDecimal(8));
-                assertEquals(new BigDecimal(val + ".55555"), 
resultSet.getBigDecimal("v8"));
-                assertEquals(val.toString(), resultSet.getString(9));
-                assertEquals(val.toString(), resultSet.getString("v9"));
-                assertEquals(val.toString(), new 
String(resultSet.getBytes(10)));
-                assertEquals(val.toString(), new 
String(resultSet.getBytes("v10")));
+            // Get and validate each column value
+            assertEquals(val % 2 == 0, resultSet.getBoolean(1));
+            assertEquals(val % 2 == 0, resultSet.getBoolean("v1"));
+            assertEquals(val.byteValue(), resultSet.getByte(2));
+            assertEquals(val.byteValue(), resultSet.getByte("v2"));
+            assertEquals(val.shortValue(), resultSet.getShort(3));
+            assertEquals(val.shortValue(), resultSet.getShort("v3"));
+            assertEquals(val, resultSet.getInt(4));
+            assertEquals(val, resultSet.getInt("v4"));
+            assertEquals(val.longValue(), resultSet.getLong(5));
+            assertEquals(val.longValue(), resultSet.getLong("v5"));
+            assertTrue(resultSet.getFloat(6) - val - 0.1 < 0.0001);
+            assertTrue(resultSet.getFloat("v6") - val - 0.1 < 0.0001);
+            assertTrue(resultSet.getDouble(7) - val - 0.22 < 0.0001);
+            assertTrue(resultSet.getDouble("v7") - val - 0.22 < 0.0001);
+            assertEquals(new BigDecimal(val + ".55555"), 
resultSet.getBigDecimal(8));
+            assertEquals(new BigDecimal(val + ".55555"), 
resultSet.getBigDecimal("v8"));
+            assertEquals(val.toString(), resultSet.getString(9));
+            assertEquals(val.toString(), resultSet.getString("v9"));
+            assertEquals(val.toString(), new String(resultSet.getBytes(10)));
+            assertEquals(val.toString(), new 
String(resultSet.getBytes("v10")));
 
-                // Get data according to wrong data type
-                assertThrowsExactly(
-                        SQLDataException.class,
-                        () -> resultSet.getLong(1),
-                        "java.lang.ClassCastException: java.lang.Boolean 
cannot be cast to java.lang.Long");
-                assertThrowsExactly(
-                        SQLDataException.class,
-                        () -> resultSet.getLong(2),
-                        "java.lang.ClassCastException: java.lang.Byte cannot 
be cast to java.lang.Long");
-                assertThrowsExactly(
-                        SQLDataException.class,
-                        () -> resultSet.getLong(3),
-                        "java.lang.ClassCastException: java.lang.Short cannot 
be cast to java.lang.Long");
-                assertThrowsExactly(
-                        SQLDataException.class,
-                        () -> resultSet.getLong(4),
-                        "java.lang.ClassCastException: java.lang.Integer 
cannot be cast to java.lang.Long");
-                assertThrowsExactly(
-                        SQLDataException.class,
-                        () -> resultSet.getInt(5),
-                        "java.lang.ClassCastException: java.lang.Long cannot 
be cast to java.lang.Int");
-                assertThrowsExactly(
-                        SQLDataException.class,
-                        () -> resultSet.getLong(6),
-                        "java.lang.ClassCastException: java.lang.Float cannot 
be cast to java.lang.Long");
-                assertThrowsExactly(
-                        SQLDataException.class,
-                        () -> resultSet.getLong(7),
-                        "java.lang.ClassCastException: java.lang.Double cannot 
be cast to java.lang.Long");
-                assertThrowsExactly(
-                        SQLDataException.class,
-                        () -> resultSet.getLong(8),
-                        "java.lang.ClassCastException: java.lang.BigDecimal 
cannot be cast to java.lang.Long");
-                assertThrowsExactly(
-                        SQLDataException.class,
-                        () -> resultSet.getLong(9),
-                        "java.lang.ClassCastException: java.lang.String cannot 
be cast to java.lang.Long");
-                assertThrowsExactly(
-                        SQLDataException.class,
-                        () -> resultSet.getLong(10),
-                        "java.lang.ClassCastException: java.lang.byte[] cannot 
be cast to java.lang.Long");
+            // Get data according to wrong data type
+            assertThrowsExactly(
+                    SQLDataException.class,
+                    () -> resultSet.getLong(1),
+                    "java.lang.ClassCastException: java.lang.Boolean cannot be 
cast to java.lang.Long");
+            assertThrowsExactly(
+                    SQLDataException.class,
+                    () -> resultSet.getLong(6),
+                    "java.lang.ClassCastException: java.lang.Float cannot be 
cast to java.lang.Long");
+            assertThrowsExactly(
+                    SQLDataException.class,
+                    () -> resultSet.getLong(7),
+                    "java.lang.ClassCastException: java.lang.Double cannot be 
cast to java.lang.Long");
+            assertThrowsExactly(
+                    SQLDataException.class,
+                    () -> resultSet.getLong(8),
+                    "java.lang.ClassCastException: java.lang.BigDecimal cannot 
be cast to java.lang.Long");
 
-                // Get not exist column
-                assertThrowsExactly(
-                        SQLDataException.class,
-                        () -> resultSet.getLong("id1"),
-                        "Column[id1] is not exist");
-                assertThrowsExactly(
-                        SQLException.class, () -> resultSet.getLong(11), 
"Column[11] is not exist");
-                assertThrowsExactly(
-                        SQLException.class, () -> resultSet.getLong(-1), 
"Column[-1] is not exist");
-            }
+            // Get not exist column
+            assertThrowsExactly(
+                    SQLDataException.class,
+                    () -> resultSet.getLong("id1"),
+                    "Column[id1] is not exist");
+            assertThrowsExactly(
+                    SQLException.class, () -> resultSet.getLong(11), 
"Column[11] is not exist");
+            assertThrowsExactly(
+                    SQLException.class, () -> resultSet.getLong(-1), 
"Column[-1] is not exist");
         }
         assertEquals(resultCount, RECORD_SIZE);
     }

Reply via email to