zhuzhurk commented on a change in pull request #19152:
URL: https://github.com/apache/flink/pull/19152#discussion_r830729038
##########
File path:
flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/TpcdsTestProgram.java
##########
@@ -157,34 +166,33 @@ private static TableEnvironment prepareTableEnv(String
sourceTablePath, Boolean
TPCDS_TABLES.forEach(
table -> {
TpcdsSchema schema =
TpcdsSchemaProvider.getTableSchema(table);
- CsvTableSource.Builder builder = CsvTableSource.builder();
- builder.path(sourceTablePath + FILE_SEPARATOR + table +
DATA_SUFFIX);
- for (int i = 0; i < schema.getFieldNames().size(); i++) {
- builder.field(
- schema.getFieldNames().get(i),
- TypeConversions.fromDataTypeToLegacyInfo(
- schema.getFieldTypes().get(i)));
- }
- builder.fieldDelimiter(COL_DELIMITER);
- builder.emptyColumnAsNull();
- builder.lineDelimiter("\n");
- CsvTableSource tableSource = builder.build();
- ConnectorCatalogTable catalogTable =
- ConnectorCatalogTable.source(tableSource, true);
- tEnv.getCatalog(tEnv.getCurrentCatalog())
- .ifPresent(
- catalog -> {
- try {
- catalog.createTable(
- new ObjectPath(
-
tEnv.getCurrentDatabase(), table),
- catalogTable,
- false);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
+ String filePath = sourceTablePath + FILE_SEPARATOR + table
+ DATA_SUFFIX;
+
+ tEnv.createTable(
+ table,
+ TableDescriptor.forConnector(FILE_CONNECTOR_NAME)
+ .schema(
+ Schema.newBuilder()
+ .fromFields(
+
schema.getFieldNames(),
+
schema.getFieldTypes())
+ .build())
+ .format(
+
FormatDescriptor.forFormat(CSV_FORMAT)
+ .option(
+
CsvFormatOptions.FIELD_DELIMITER,
+ COL_DELIMITER)
+ .option(
+
CsvFormatOptions.IGNORE_PARSE_ERRORS,
Review comment:
Looks to me this config it is not needed to be `true` because
`ignoreParseErrors()` was not invoked on the old `CsvTableSource`.
Correct me if parse errors do happen in this case.
##########
File path:
flink-end-to-end-tests/flink-tpcds-test/src/main/java/org/apache/flink/table/tpcds/utils/TpcdsResultComparator.java
##########
@@ -61,7 +62,8 @@ public static void main(String[] args) throws Exception {
for (String queryId : VALIDATE_QUERIES) {
File expectedFile = new File(expectedDir, queryId + RESULT_SUFFIX);
- File actualFile = new File(actualDir, queryId + RESULT_SUFFIX);
+ File actualFileDirectory = new File(actualDir, queryId +
RESULT_SUFFIX);
+ File actualFile =
Objects.requireNonNull(actualFileDirectory.listFiles())[0];
Review comment:
Would you explain a bit that why the actual file is changed in this case?
Does this means that there is behavior change when switching from an old csv
sink to new a new csv sink?
##########
File path: flink-end-to-end-tests/flink-tpcds-test/tpcds-tool/data_generator.sh
##########
@@ -130,6 +130,10 @@ elif [[ "$OS_TYPE" == "linux" ]]; then
chmod +x $generator_dir/dsdgen_linux
cd $generator_dir
./dsdgen_linux -SCALE $scale_factor -FORCE Y -DIR $data_dir
+ echo "[INFO] `date +%H:%M:%S` Convert file encoding of customer.dat to
UTF-8 start."
Review comment:
Would you add a comment to explain that "the data generator may generate
files encoded in latin1, which the new csv source cannot read"?
And looks to me this is a bug of the new csv source and needs to be fixed.
After it is fixed, we can remove this workaround.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]