xuyangzhong commented on code in PR #25889:
URL: https://github.com/apache/flink/pull/25889#discussion_r1904983840
##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala:
##########
@@ -110,26 +111,25 @@ object TestTableSourceSinks {
def createCsvTemporarySinkTable(
tEnv: TableEnvironment,
- schema: TableSchema,
+ tableSchema: TableSchema,
tableName: String,
numFiles: Int = 1): String = {
- val tempFile = File.createTempFile("csv-test", null)
- tempFile.deleteOnExit()
- val path = tempFile.getAbsolutePath
-
- val sinkOptions = collection.mutable.Map(
- "connector.type" -> "filesystem",
- "connector.path" -> path,
- "format.type" -> "csv",
- "format.write-mode" -> "OVERWRITE",
- "format.num-files" -> numFiles.toString
- )
- sinkOptions.putAll(new Schema().schema(schema).toProperties)
-
- val sink = new
CsvBatchTableSinkFactory().createStreamTableSink(sinkOptions);
-
tEnv.asInstanceOf[TableEnvironmentInternal].registerTableSinkInternal(tableName,
sink)
-
- path
+ val tempDir = Files.createTempDirectory("csv-test").toFile
+ tempDir.deleteOnExit()
+ val tempDirPath = tempDir.getAbsolutePath
+
+ Files.createTempDirectory("csv-test")
+ val schema = tableSchema.toSchema
+ val tableDescriptor = TableDescriptor
Review Comment:
The old CSV sink writes to a specified file, so it needs to distinguish
between `append` and `override` write modes. In contrast, the new filesystem
connector writes to a directory, so it does not need to be aware of `append` or
`override` write modes.
##########
flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java:
##########
@@ -107,23 +113,44 @@ public static void main(String[] args) throws Exception {
// register sink table
String sinkTableName = QUERY_PREFIX + queryId + "_sinkTable";
- ((TableEnvironmentInternal) tableEnvironment)
- .registerTableSinkInternal(
- sinkTableName,
- new CsvTableSink(
- sinkTablePath + FILE_SEPARATOR + queryId +
RESULT_SUFFIX,
- COL_DELIMITER,
- 1,
- FileSystem.WriteMode.OVERWRITE,
- resultTable.getSchema().getFieldNames(),
-
resultTable.getSchema().getFieldDataTypes()));
+ tableEnvironment.createTable(
+ sinkTableName,
+ getTableDescriptor(
+ sinkTablePath + FILE_SEPARATOR + queryId +
RESULT_SUFFIX,
+ resultTable.getSchema().getFieldNames(),
+ resultTable.getSchema().getFieldDataTypes()));
TableResult tableResult = resultTable.executeInsert(sinkTableName);
// wait job finish
tableResult.getJobClient().get().getJobExecutionResult().get();
System.out.println("[INFO]Run TPC-DS query " + queryId + "
success.");
}
}
+ private static TableDescriptor getTableDescriptor(
+ String path, String[] fieldNames, DataType[] fieldDataTypes) {
+ final Schema.Builder schemaBuilder = Schema.newBuilder();
+ for (int i = 0; i < fieldNames.length; i++) {
+ schemaBuilder.column(fieldNames[i], fieldDataTypes[i]);
+ }
+ final Schema schema = schemaBuilder.build();
+
+ final FormatDescriptor formatDescriptor =
+ FormatDescriptor.forFormat("csv")
+ .option(CsvFormatOptions.FIELD_DELIMITER,
COL_DELIMITER)
+ .build();
+
+ return TableDescriptor.forConnector(FileSystemTableFactory.IDENTIFIER)
+ .schema(schema)
+ .option(FileSystemConnectorOptions.PATH, path)
Review Comment:
The old CSV sink writes to a specified file, so it needs to distinguish
between `append` and `override` write modes. In contrast, the new filesystem
connector writes to a directory, so it does not need to be aware of `append` or
`override` write modes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]