This is an automated email from the ASF dual-hosted git repository.
libenchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 81902e81c43 [FLINK-32211][sql-client] Supports row format in executor
81902e81c43 is described below
commit 81902e81c43990c48d05ec81469b7d99c7922698
Author: Shammon FY <[email protected]>
AuthorDate: Mon May 29 15:01:29 2023 +0800
[FLINK-32211][sql-client] Supports row format in executor
In this commit, we also removed DataConverter in flink-sql-jdbc-driver
module.
Close apache/flink#22671
---
.../flink/table/client/gateway/Executor.java | 9 ++
.../flink/table/client/gateway/ExecutorImpl.java | 37 +++++++-
.../apache/flink/table/jdbc/FlinkConnection.java | 4 +-
.../apache/flink/table/jdbc/FlinkResultSet.java | 50 +++++-----
.../apache/flink/table/jdbc/FlinkStatement.java | 5 +-
.../flink/table/jdbc/utils/DataConverter.java | 88 -----------------
.../table/jdbc/utils/DatabaseMetaDataUtils.java | 6 +-
.../table/jdbc/utils/DefaultDataConverter.java | 105 ---------------------
.../table/jdbc/utils/StringDataConverter.java | 105 ---------------------
.../flink/table/jdbc/FlinkResultSetTest.java | 47 ++++-----
10 files changed, 89 insertions(+), 367 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index b636d326560..3be128e0708 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.client.gateway;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import java.io.Closeable;
@@ -36,6 +37,14 @@ public interface Executor extends Closeable {
return new ExecutorImpl(defaultContext, address, sessionId);
}
+ static Executor create(
+ DefaultContext defaultContext,
+ InetSocketAddress address,
+ String sessionId,
+ RowFormat rowFormat) {
+ return new ExecutorImpl(defaultContext, address, sessionId, rowFormat);
+ }
+
static Executor create(DefaultContext defaultContext, URL address, String
sessionId) {
return new ExecutorImpl(defaultContext, address, sessionId);
}
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
index 4ec7eb3e8eb..f4f6b40ed56 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java
@@ -112,6 +112,7 @@ public class ExecutorImpl implements Executor {
private final SqlGatewayRestAPIVersion connectionVersion;
private final SessionHandle sessionHandle;
+ private final RowFormat rowFormat;
public ExecutorImpl(
DefaultContext defaultContext, InetSocketAddress gatewayAddress,
String sessionId) {
@@ -119,11 +120,30 @@ public class ExecutorImpl implements Executor {
defaultContext,
NetUtils.socketToUrl(gatewayAddress),
sessionId,
- HEARTBEAT_INTERVAL_MILLISECONDS);
+ HEARTBEAT_INTERVAL_MILLISECONDS,
+ RowFormat.PLAIN_TEXT);
+ }
+
+ public ExecutorImpl(
+ DefaultContext defaultContext,
+ InetSocketAddress gatewayAddress,
+ String sessionId,
+ RowFormat rowFormat) {
+ this(
+ defaultContext,
+ NetUtils.socketToUrl(gatewayAddress),
+ sessionId,
+ HEARTBEAT_INTERVAL_MILLISECONDS,
+ rowFormat);
}
public ExecutorImpl(DefaultContext defaultContext, URL gatewayUrl, String
sessionId) {
- this(defaultContext, gatewayUrl, sessionId,
HEARTBEAT_INTERVAL_MILLISECONDS);
+ this(
+ defaultContext,
+ gatewayUrl,
+ sessionId,
+ HEARTBEAT_INTERVAL_MILLISECONDS,
+ RowFormat.PLAIN_TEXT);
}
@VisibleForTesting
@@ -132,7 +152,12 @@ public class ExecutorImpl implements Executor {
InetSocketAddress gatewayAddress,
String sessionId,
long heartbeatInterval) {
- this(defaultContext, NetUtils.socketToUrl(gatewayAddress), sessionId,
heartbeatInterval);
+ this(
+ defaultContext,
+ NetUtils.socketToUrl(gatewayAddress),
+ sessionId,
+ heartbeatInterval,
+ RowFormat.PLAIN_TEXT);
}
@VisibleForTesting
@@ -140,9 +165,11 @@ public class ExecutorImpl implements Executor {
DefaultContext defaultContext,
URL gatewayUrl,
String sessionId,
- long heartbeatInterval) {
+ long heartbeatInterval,
+ RowFormat rowFormat) {
this.registry = new AutoCloseableRegistry();
this.gatewayUrl = gatewayUrl;
+ this.rowFormat = rowFormat;
try {
// register required resource
this.executorService = Executors.newCachedThreadPool();
@@ -433,7 +460,7 @@ public class ExecutorImpl implements Executor {
return sendRequest(
FetchResultsHeaders.getDefaultInstance(),
new FetchResultsMessageParameters(
- sessionHandle, operationHandle, token,
RowFormat.PLAIN_TEXT),
+ sessionHandle, operationHandle, token,
rowFormat),
EmptyRequestBody.getInstance())
.get();
} catch (InterruptedException e) {
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
index 8c0f1a237c6..7fdc99aeddf 100644
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkConnection.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.jdbc;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.StatementResult;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.table.gateway.service.context.DefaultContext;
import org.apache.flink.table.jdbc.utils.DriverUtils;
@@ -59,7 +60,8 @@ public class FlinkConnection extends BaseConnection {
DriverUtils.fromProperties(driverUri.getProperties()),
Collections.emptyList()),
driverUri.getAddress(),
- UUID.randomUUID().toString());
+ UUID.randomUUID().toString(),
+ RowFormat.JSON);
driverUri.getCatalog().ifPresent(this::setSessionCatalog);
driverUri.getDatabase().ifPresent(this::setSessionSchema);
}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
index 9bd6b6fe2f8..6b8306d4cee 100644
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkResultSet.java
@@ -21,8 +21,8 @@ package org.apache.flink.table.jdbc;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.client.gateway.StatementResult;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.jdbc.utils.CloseableResultIterator;
-import org.apache.flink.table.jdbc.utils.DataConverter;
import org.apache.flink.table.jdbc.utils.StatementResultIterator;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.DecimalType;
@@ -52,30 +52,20 @@ public class FlinkResultSet extends BaseResultSet {
private final List<String> columnNameList;
private final Statement statement;
private final CloseableResultIterator<RowData> iterator;
- private final DataConverter dataConverter;
private final FlinkResultSetMetaData resultSetMetaData;
private RowData currentRow;
private boolean wasNull;
private volatile boolean closed;
- public FlinkResultSet(
- Statement statement, StatementResult result, DataConverter
dataConverter) {
- this(
- statement,
- new StatementResultIterator(result),
- result.getResultSchema(),
- dataConverter);
+ public FlinkResultSet(Statement statement, StatementResult result) {
+ this(statement, new StatementResultIterator(result),
result.getResultSchema());
}
public FlinkResultSet(
- Statement statement,
- CloseableResultIterator<RowData> iterator,
- ResolvedSchema schema,
- DataConverter dataConverter) {
+ Statement statement, CloseableResultIterator<RowData> iterator,
ResolvedSchema schema) {
this.statement = checkNotNull(statement, "Statement cannot be null");
this.iterator = checkNotNull(iterator, "Statement result cannot be
null");
- this.dataConverter = checkNotNull(dataConverter, "Data converter
cannot be null");
this.currentRow = null;
this.wasNull = false;
@@ -155,8 +145,9 @@ public class FlinkResultSet extends BaseResultSet {
checkValidRow();
checkValidColumn(columnIndex);
+ StringData stringData = currentRow.getString(columnIndex - 1);
try {
- return dataConverter.getString(currentRow, columnIndex - 1);
+ return stringData == null ? null : stringData.toString();
} catch (Exception e) {
throw new SQLDataException(e);
}
@@ -168,7 +159,7 @@ public class FlinkResultSet extends BaseResultSet {
checkValidRow();
checkValidColumn(columnIndex);
try {
- return dataConverter.getBoolean(currentRow, columnIndex - 1);
+ return !currentRow.isNullAt(columnIndex - 1) &&
currentRow.getBoolean(columnIndex - 1);
} catch (Exception e) {
throw new SQLDataException(e);
}
@@ -180,7 +171,7 @@ public class FlinkResultSet extends BaseResultSet {
checkValidRow();
checkValidColumn(columnIndex);
try {
- return dataConverter.getByte(currentRow, columnIndex - 1);
+ return currentRow.isNullAt(columnIndex - 1) ? 0 :
currentRow.getByte(columnIndex - 1);
} catch (Exception e) {
throw new SQLDataException(e);
}
@@ -192,7 +183,7 @@ public class FlinkResultSet extends BaseResultSet {
checkValidRow();
checkValidColumn(columnIndex);
try {
- return dataConverter.getShort(currentRow, columnIndex - 1);
+ return currentRow.isNullAt(columnIndex - 1) ? 0 :
currentRow.getShort(columnIndex - 1);
} catch (Exception e) {
throw new SQLDataException(e);
}
@@ -204,7 +195,7 @@ public class FlinkResultSet extends BaseResultSet {
checkValidRow();
checkValidColumn(columnIndex);
try {
- return dataConverter.getInt(currentRow, columnIndex - 1);
+ return currentRow.isNullAt(columnIndex - 1) ? 0 :
currentRow.getInt(columnIndex - 1);
} catch (Exception e) {
throw new SQLDataException(e);
}
@@ -217,7 +208,7 @@ public class FlinkResultSet extends BaseResultSet {
checkValidColumn(columnIndex);
try {
- return dataConverter.getLong(currentRow, columnIndex - 1);
+ return currentRow.isNullAt(columnIndex - 1) ? 0L :
currentRow.getLong(columnIndex - 1);
} catch (Exception e) {
throw new SQLDataException(e);
}
@@ -229,7 +220,7 @@ public class FlinkResultSet extends BaseResultSet {
checkValidRow();
checkValidColumn(columnIndex);
try {
- return dataConverter.getFloat(currentRow, columnIndex - 1);
+ return currentRow.isNullAt(columnIndex - 1) ? 0 :
currentRow.getFloat(columnIndex - 1);
} catch (Exception e) {
throw new SQLDataException(e);
}
@@ -241,7 +232,7 @@ public class FlinkResultSet extends BaseResultSet {
checkValidRow();
checkValidColumn(columnIndex);
try {
- return dataConverter.getDouble(currentRow, columnIndex - 1);
+ return currentRow.isNullAt(columnIndex - 1) ? 0 :
currentRow.getDouble(columnIndex - 1);
} catch (Exception e) {
throw new SQLDataException(e);
}
@@ -258,7 +249,7 @@ public class FlinkResultSet extends BaseResultSet {
checkValidRow();
checkValidColumn(columnIndex);
try {
- return dataConverter.getBinary(currentRow, columnIndex - 1);
+ return currentRow.getBinary(columnIndex - 1);
} catch (Exception e) {
throw new SQLDataException(e);
}
@@ -392,11 +383,14 @@ public class FlinkResultSet extends BaseResultSet {
}
DecimalType decimalType = (DecimalType) dataType.getLogicalType();
try {
- return dataConverter.getDecimal(
- currentRow,
- columnIndex - 1,
- decimalType.getPrecision(),
- decimalType.getScale());
+ return currentRow.isNullAt(columnIndex - 1)
+ ? null
+ : currentRow
+ .getDecimal(
+ columnIndex - 1,
+ decimalType.getPrecision(),
+ decimalType.getScale())
+ .toBigDecimal();
} catch (Exception e) {
throw new SQLDataException(e);
}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java
index 9400578b2dd..f0394bdda1a 100644
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/FlinkStatement.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.jdbc;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.StatementResult;
-import org.apache.flink.table.jdbc.utils.StringDataConverter;
import javax.annotation.concurrent.NotThreadSafe;
@@ -58,7 +57,7 @@ public class FlinkStatement extends BaseStatement {
if (!result.isQueryResult()) {
throw new SQLException(String.format("Statement[%s] is not a
query.", sql));
}
- currentResults = new FlinkResultSet(this, result,
StringDataConverter.CONVERTER);
+ currentResults = new FlinkResultSet(this, result);
return currentResults;
}
@@ -106,7 +105,7 @@ public class FlinkStatement extends BaseStatement {
public boolean execute(String sql) throws SQLException {
StatementResult result = executeInternal(sql);
if (result.isQueryResult() || result.getResultKind() ==
ResultKind.SUCCESS_WITH_CONTENT) {
- currentResults = new FlinkResultSet(this, result,
StringDataConverter.CONVERTER);
+ currentResults = new FlinkResultSet(this, result);
return true;
}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java
deleted file mode 100644
index 1709932c8c1..00000000000
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DataConverter.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.jdbc.utils;
-
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.TimestampData;
-
-import java.math.BigDecimal;
-import java.sql.Array;
-import java.sql.Timestamp;
-import java.util.Map;
-
-/** Convert data from row data for result set. */
-public interface DataConverter {
-
- /** Returns the boolean value at the given position. */
- boolean getBoolean(RowData rowData, int pos);
-
- /** Returns the byte value at the given position. */
- byte getByte(RowData rowData, int pos);
-
- /** Returns the short value at the given position. */
- short getShort(RowData rowData, int pos);
-
- /** Returns the integer value at the given position. */
- int getInt(RowData rowData, int pos);
-
- /** Returns the long value at the given position. */
- long getLong(RowData rowData, int pos);
-
- /** Returns the float value at the given position. */
- float getFloat(RowData rowData, int pos);
-
- /** Returns the double value at the given position. */
- double getDouble(RowData rowData, int pos);
-
- /** Returns the string value at the given position. */
- String getString(RowData rowData, int pos);
-
- /**
- * Returns the decimal value at the given position.
- *
- * <p>The precision and scale are required to determine whether the
decimal value was stored in
- * a compact representation (see {@link DecimalData}).
- */
- BigDecimal getDecimal(RowData rowData, int pos, int precision, int scale);
-
- /**
- * Returns the timestamp value at the given position.
- *
- * <p>The precision is required to determine whether the timestamp value
was stored in a compact
- * representation (see {@link TimestampData}).
- */
- Timestamp getTimestamp(RowData rowData, int pos, int precision);
-
- /** Returns the binary value at the given position. */
- byte[] getBinary(RowData rowData, int pos);
-
- /** Returns the array value at the given position. */
- Array getArray(RowData rowData, int pos);
-
- /** Returns the map value at the given position. */
- Map<?, ?> getMap(RowData rowData, int pos);
-
- /**
- * Returns the row value at the given position.
- *
- * <p>The number of fields is required to correctly extract the row.
- */
- RowData getRow(RowData rowData, int pos, int numFields);
-}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
index b99bd54a30f..11a91c416cb 100644
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
+++
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DatabaseMetaDataUtils.java
@@ -66,8 +66,7 @@ public class DatabaseMetaDataUtils {
return new FlinkResultSet(
statement,
new CollectionResultIterator(catalogs.iterator()),
- ResolvedSchema.of(TABLE_CAT_COLUMN),
- StringDataConverter.CONVERTER);
+ ResolvedSchema.of(TABLE_CAT_COLUMN));
}
/**
@@ -104,7 +103,6 @@ public class DatabaseMetaDataUtils {
return new FlinkResultSet(
statement,
new CollectionResultIterator(schemaWithCatalogList.iterator()),
- ResolvedSchema.of(TABLE_SCHEM_COLUMN, TABLE_CATALOG_COLUMN),
- StringDataConverter.CONVERTER);
+ ResolvedSchema.of(TABLE_SCHEM_COLUMN, TABLE_CATALOG_COLUMN));
}
}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java
deleted file mode 100644
index c5b65092df2..00000000000
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/DefaultDataConverter.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.jdbc.utils;
-
-import org.apache.flink.table.data.RowData;
-
-import java.math.BigDecimal;
-import java.sql.Array;
-import java.sql.Timestamp;
-import java.util.Map;
-
-/** Default data converter for result set. */
-public class DefaultDataConverter implements DataConverter {
- public static final DataConverter CONVERTER = new DefaultDataConverter();
-
- private DefaultDataConverter() {}
-
- @Override
- public boolean getBoolean(RowData rowData, int pos) {
- return !rowData.isNullAt(pos) && rowData.getBoolean(pos);
- }
-
- @Override
- public byte getByte(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 : rowData.getByte(pos);
- }
-
- @Override
- public short getShort(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 : rowData.getShort(pos);
- }
-
- @Override
- public int getInt(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 : rowData.getInt(pos);
- }
-
- @Override
- public long getLong(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 : rowData.getLong(pos);
- }
-
- @Override
- public float getFloat(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 : rowData.getFloat(pos);
- }
-
- @Override
- public double getDouble(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 : rowData.getDouble(pos);
- }
-
- @Override
- public String getString(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? null :
rowData.getString(pos).toString();
- }
-
- @Override
- public BigDecimal getDecimal(RowData rowData, int pos, int precision, int
scale) {
- return rowData.isNullAt(pos)
- ? null
- : rowData.getDecimal(pos, precision, scale).toBigDecimal();
- }
-
- @Override
- public Timestamp getTimestamp(RowData rowData, int pos, int precision) {
- return rowData.isNullAt(pos) ? null : rowData.getTimestamp(pos,
precision).toTimestamp();
- }
-
- @Override
- public byte[] getBinary(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? null : rowData.getBinary(pos);
- }
-
- @Override
- public Array getArray(RowData rowData, int pos) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<?, ?> getMap(RowData rowData, int pos) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public RowData getRow(RowData rowData, int pos, int numFields) {
- return rowData.getRow(pos, numFields);
- }
-}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java
b/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java
deleted file mode 100644
index cd8e8e0f1a3..00000000000
---
a/flink-table/flink-sql-jdbc-driver/src/main/java/org/apache/flink/table/jdbc/utils/StringDataConverter.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.jdbc.utils;
-
-import org.apache.flink.table.data.RowData;
-
-import java.math.BigDecimal;
-import java.sql.Array;
-import java.sql.Timestamp;
-import java.util.Map;
-
-/** Converter string value to different value. */
-public class StringDataConverter implements DataConverter {
- public static final DataConverter CONVERTER = new StringDataConverter();
-
- private StringDataConverter() {}
-
- @Override
- public boolean getBoolean(RowData rowData, int pos) {
- return Boolean.parseBoolean(getString(rowData, pos));
- }
-
- @Override
- public byte getByte(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 : Byte.parseByte(getString(rowData,
pos));
- }
-
- @Override
- public short getShort(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 : Short.parseShort(getString(rowData,
pos));
- }
-
- @Override
- public int getInt(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 : Integer.parseInt(getString(rowData,
pos));
- }
-
- @Override
- public long getLong(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 : Long.parseLong(getString(rowData,
pos));
- }
-
- @Override
- public float getFloat(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 : Float.parseFloat(getString(rowData,
pos));
- }
-
- @Override
- public double getDouble(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? 0 :
Double.parseDouble(getString(rowData, pos));
- }
-
- @Override
- public String getString(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? null :
rowData.getString(pos).toString();
- }
-
- @Override
- public BigDecimal getDecimal(RowData rowData, int pos, int precision, int
scale) {
- return rowData.isNullAt(pos)
- ? null
- : new BigDecimal(getString(rowData, pos)).setScale(scale);
- }
-
- @Override
- public byte[] getBinary(RowData rowData, int pos) {
- return rowData.isNullAt(pos) ? null : rowData.getString(pos).toBytes();
- }
-
- @Override
- public Timestamp getTimestamp(RowData rowData, int pos, int precision) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Array getArray(RowData rowData, int pos) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Map<?, ?> getMap(RowData rowData, int pos) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public RowData getRow(RowData rowData, int pos, int numFields) {
- throw new UnsupportedOperationException();
- }
-}
diff --git
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
index 6c68885d18d..88e7756e1d4 100644
---
a/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
+++
b/flink-table/flink-sql-jdbc-driver/src/test/java/org/apache/flink/table/jdbc/FlinkResultSetTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.jdbc.utils.DefaultDataConverter;
-import org.apache.flink.table.jdbc.utils.StringDataConverter;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
@@ -38,7 +36,6 @@ import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLDataException;
import java.sql.SQLException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.stream.IntStream;
@@ -93,8 +90,7 @@ public class FlinkResultSetTest {
new FlinkResultSet(
new TestingStatement(),
new StatementResult(
- SCHEMA, data, true, ResultKind.SUCCESS,
JobID.generate()),
- DefaultDataConverter.CONVERTER)) {
+ SCHEMA, data, true, ResultKind.SUCCESS,
JobID.generate()))) {
validateResultData(resultSet);
}
}
@@ -107,27 +103,28 @@ public class FlinkResultSetTest {
.boxed()
.map(
v ->
- stringRowData(
- v % 2 == 0,
- v.byteValue(),
- v.shortValue(),
- v,
- v.longValue(),
- (float) (v + 0.1),
- v + 0.22,
-
DecimalData.fromBigDecimal(
- new
BigDecimal(v + ".55555"),
- 10,
- 5),
-
StringData.fromString(v.toString()),
- v.toString()))
+ (RowData)
+ GenericRowData.of(
+ v % 2 == 0,
+ v.byteValue(),
+ v.shortValue(),
+ v,
+ v.longValue(),
+ (float) (v +
0.1),
+ v + 0.22,
+
DecimalData.fromBigDecimal(
+ new
BigDecimal(
+
v + ".55555"),
+ 10,
+ 5),
+
StringData.fromString(v.toString()),
+
v.toString().getBytes()))
.iterator());
try (ResultSet resultSet =
new FlinkResultSet(
new TestingStatement(),
new StatementResult(
- SCHEMA, data, true, ResultKind.SUCCESS,
JobID.generate()),
- StringDataConverter.CONVERTER)) {
+ SCHEMA, data, true, ResultKind.SUCCESS,
JobID.generate()))) {
validateResultData(resultSet);
}
}
@@ -146,8 +143,7 @@ public class FlinkResultSetTest {
new FlinkResultSet(
new TestingStatement(),
new StatementResult(
- SCHEMA, data, true, ResultKind.SUCCESS,
JobID.generate()),
- StringDataConverter.CONVERTER)) {
+ SCHEMA, data, true, ResultKind.SUCCESS,
JobID.generate()))) {
assertTrue(resultSet.next());
assertFalse(resultSet.getBoolean(1));
assertEquals((byte) 0, resultSet.getByte(2));
@@ -163,11 +159,6 @@ public class FlinkResultSetTest {
}
}
- private RowData stringRowData(Object... values) {
- return GenericRowData.of(
- Arrays.stream(values).map(v ->
StringData.fromString(v.toString())).toArray());
- }
-
private static void validateResultData(ResultSet resultSet) throws
SQLException {
int resultCount = 0;
while (resultSet.next()) {