lsyldliu commented on code in PR #25889:
URL: https://github.com/apache/flink/pull/25889#discussion_r1904826905
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##########
@@ -1386,8 +1386,8 @@ private void registerTableSourceInternal(String name,
TableSource<?> tableSource
}
}
- @Override
- public void registerTableSinkInternal(String name, TableSink<?> tableSink)
{
+ /** TODO FLINK-36132 Remove this method later. */
+ private void registerTableSinkInternal(String name, TableSink<?>
tableSink) {
Review Comment:
I have a doubt that why we can't remove it directly because no one call it?
##########
flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/utils/TpcdsResultComparator.java:
##########
@@ -52,26 +54,30 @@ public class TpcdsResultComparator {
private static final String RESULT_SUFFIX = ".ans";
private static final double TOLERATED_DOUBLE_DEVIATION = 0.01d;
- public static void main(String[] args) throws Exception {
+ public static void main2(String[] args) throws Exception {
Review Comment:
I'm curious that why you rename it main2?
##########
flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java:
##########
@@ -107,23 +113,44 @@ public static void main(String[] args) throws Exception {
// register sink table
String sinkTableName = QUERY_PREFIX + queryId + "_sinkTable";
- ((TableEnvironmentInternal) tableEnvironment)
- .registerTableSinkInternal(
- sinkTableName,
- new CsvTableSink(
- sinkTablePath + FILE_SEPARATOR + queryId +
RESULT_SUFFIX,
- COL_DELIMITER,
- 1,
- FileSystem.WriteMode.OVERWRITE,
- resultTable.getSchema().getFieldNames(),
-
resultTable.getSchema().getFieldDataTypes()));
+ tableEnvironment.createTable(
+ sinkTableName,
+ getTableDescriptor(
+ sinkTablePath + FILE_SEPARATOR + queryId +
RESULT_SUFFIX,
+ resultTable.getSchema().getFieldNames(),
+ resultTable.getSchema().getFieldDataTypes()));
TableResult tableResult = resultTable.executeInsert(sinkTableName);
// wait job finish
tableResult.getJobClient().get().getJobExecutionResult().get();
System.out.println("[INFO]Run TPC-DS query " + queryId + "
success.");
}
}
+ private static TableDescriptor getTableDescriptor(
+ String path, String[] fieldNames, DataType[] fieldDataTypes) {
+ final Schema.Builder schemaBuilder = Schema.newBuilder();
+ for (int i = 0; i < fieldNames.length; i++) {
+ schemaBuilder.column(fieldNames[i], fieldDataTypes[i]);
+ }
+ final Schema schema = schemaBuilder.build();
+
+ final FormatDescriptor formatDescriptor =
+ FormatDescriptor.forFormat("csv")
+ .option(CsvFormatOptions.FIELD_DELIMITER,
COL_DELIMITER)
+ .build();
+
+ return TableDescriptor.forConnector(FileSystemTableFactory.IDENTIFIER)
+ .schema(schema)
+ .option(FileSystemConnectorOptions.PATH, path)
Review Comment:
I'm just curious that current write mode is insert into or insert overwrite?
##########
flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/utils/TpcdsResultComparator.java:
##########
@@ -52,26 +54,30 @@ public class TpcdsResultComparator {
private static final String RESULT_SUFFIX = ".ans";
private static final double TOLERATED_DOUBLE_DEVIATION = 0.01d;
- public static void main(String[] args) throws Exception {
+ public static void main2(String[] args) throws Exception {
Review Comment:
BTW, I see no one call `TpcdsResultComparator` this class, why need we to
modify it?
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala:
##########
@@ -110,26 +111,25 @@ object TestTableSourceSinks {
def createCsvTemporarySinkTable(
tEnv: TableEnvironment,
- schema: TableSchema,
+ tableSchema: TableSchema,
tableName: String,
numFiles: Int = 1): String = {
- val tempFile = File.createTempFile("csv-test", null)
- tempFile.deleteOnExit()
- val path = tempFile.getAbsolutePath
-
- val sinkOptions = collection.mutable.Map(
- "connector.type" -> "filesystem",
- "connector.path" -> path,
- "format.type" -> "csv",
- "format.write-mode" -> "OVERWRITE",
- "format.num-files" -> numFiles.toString
- )
- sinkOptions.putAll(new Schema().schema(schema).toProperties)
-
- val sink = new
CsvBatchTableSinkFactory().createStreamTableSink(sinkOptions);
-
tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(tableName,
sink)
-
- path
+ val tempDir = Files.createTempDirectory("csv-test").toFile
+ tempDir.deleteOnExit()
+ val tempDirPath = tempDir.getAbsolutePath
+
+ Files.createTempDirectory("csv-test")
+ val schema = tableSchema.toSchema
+ val tableDescriptor = TableDescriptor
Review Comment:
You have remove the `format.write-mode` option, what is the write mode now,
`insert into` or `insert overwrite`?
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala:
##########
@@ -110,26 +111,25 @@ object TestTableSourceSinks {
def createCsvTemporarySinkTable(
tEnv: TableEnvironment,
- schema: TableSchema,
+ tableSchema: TableSchema,
tableName: String,
numFiles: Int = 1): String = {
- val tempFile = File.createTempFile("csv-test", null)
- tempFile.deleteOnExit()
- val path = tempFile.getAbsolutePath
-
- val sinkOptions = collection.mutable.Map(
- "connector.type" -> "filesystem",
- "connector.path" -> path,
- "format.type" -> "csv",
- "format.write-mode" -> "OVERWRITE",
- "format.num-files" -> numFiles.toString
- )
- sinkOptions.putAll(new Schema().schema(schema).toProperties)
-
- val sink = new
CsvBatchTableSinkFactory().createStreamTableSink(sinkOptions);
-
tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(tableName,
sink)
-
- path
+ val tempDir = Files.createTempDirectory("csv-test").toFile
+ tempDir.deleteOnExit()
+ val tempDirPath = tempDir.getAbsolutePath
+
+ Files.createTempDirectory("csv-test")
Review Comment:
Why here create the temp dir again?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]