This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6c65805699 [Improve] Improve read with parquet type convert error
(#6683)
6c65805699 is described below
commit 6c658056993006bb5429e9af58107c29d4f864dd
Author: Jia Fan <[email protected]>
AuthorDate: Fri Apr 12 10:27:12 2024 +0800
[Improve] Improve read with parquet type convert error (#6683)
---
.../file/source/reader/ParquetReadStrategy.java | 60 +++++++++++----------
.../seatunnel/file/source/reader/ReadStrategy.java | 36 +++++++++++++
.../file/writer/ParquetReadStrategyTest.java | 63 ++++++++++++++++++++++
3 files changed, 132 insertions(+), 27 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index cbc39b2f97..1264df9807 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
@@ -28,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
@@ -67,6 +69,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
@Slf4j
public class ParquetReadStrategy extends AbstractReadStrategy {
@@ -75,6 +78,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy
{
private static final long NANOS_PER_MILLISECOND = 1000000;
private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1L);
private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+ private static final String PARQUET = "Parquet";
private int[] indexes;
@@ -234,6 +238,12 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws
FileConnectorException {
+ return getSeaTunnelRowTypeInfo(TablePath.DEFAULT, path);
+ }
+
+ @Override
+ public SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath,
String path)
+ throws FileConnectorException {
ParquetMetadata metadata;
try (ParquetFileReader reader =
hadoopFileSystemProxy.doWithHadoopAuth(
@@ -259,19 +269,22 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
String[] fields = new String[readColumns.size()];
SeaTunnelDataType<?>[] types = new
SeaTunnelDataType[readColumns.size()];
indexes = new int[readColumns.size()];
- for (int i = 0; i < readColumns.size(); i++) {
- fields[i] = readColumns.get(i);
- Type type = originalSchema.getType(fields[i]);
- int fieldIndex = originalSchema.getFieldIndex(fields[i]);
- indexes[i] = fieldIndex;
- types[i] = parquetType2SeaTunnelType(type);
- }
+ buildColumnsWithErrorCheck(
+ tablePath,
+ IntStream.range(0, readColumns.size()).iterator(),
+ i -> {
+ fields[i] = readColumns.get(i);
+ Type type = originalSchema.getType(fields[i]);
+ int fieldIndex = originalSchema.getFieldIndex(fields[i]);
+ indexes[i] = fieldIndex;
+ types[i] = parquetType2SeaTunnelType(type, fields[i]);
+ });
seaTunnelRowType = new SeaTunnelRowType(fields, types);
seaTunnelRowTypeWithPartition = mergePartitionTypes(path,
seaTunnelRowType);
return getActualSeaTunnelRowTypeInfo();
}
- private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
+ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type, String
name) {
if (type.isPrimitive()) {
switch (type.asPrimitiveType().getPrimitiveTypeName()) {
case INT32:
@@ -287,9 +300,8 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
case DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
default:
- String errorMsg = String.format("Not support this
type [%s]", type);
- throw new FileConnectorException(
-
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
+ throw CommonError.convertToSeaTunnelTypeError(
+ PARQUET, type.toString(), name);
}
case INT64:
if (type.asPrimitiveType().getOriginalType() ==
OriginalType.TIMESTAMP_MILLIS) {
@@ -324,9 +336,7 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
int scale = Integer.parseInt(splits[1]);
return new DecimalType(precision, scale);
default:
- String errorMsg = String.format("Not support this type
[%s]", type);
- throw new FileConnectorException(
- CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
errorMsg);
+ throw CommonError.convertToSeaTunnelTypeError("Parquet",
type.toString(), name);
}
} else {
LogicalTypeAnnotation logicalTypeAnnotation =
@@ -339,7 +349,7 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
for (int i = 0; i < fields.size(); i++) {
Type fieldType = fields.get(i);
SeaTunnelDataType<?> seaTunnelDataType =
- parquetType2SeaTunnelType(fields.get(i));
+ parquetType2SeaTunnelType(fields.get(i), name);
fieldNames[i] = fieldType.getName();
seaTunnelDataTypes[i] = seaTunnelDataType;
}
@@ -349,9 +359,9 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
case MAP:
GroupType groupType =
type.asGroupType().getType(0).asGroupType();
SeaTunnelDataType<?> keyType =
-
parquetType2SeaTunnelType(groupType.getType(0));
+
parquetType2SeaTunnelType(groupType.getType(0), name);
SeaTunnelDataType<?> valueType =
-
parquetType2SeaTunnelType(groupType.getType(1));
+
parquetType2SeaTunnelType(groupType.getType(1), name);
return new MapType<>(keyType, valueType);
case LIST:
Type elementType;
@@ -360,7 +370,8 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
} catch (Exception e) {
elementType = type.asGroupType().getType(0);
}
- SeaTunnelDataType<?> fieldType =
parquetType2SeaTunnelType(elementType);
+ SeaTunnelDataType<?> fieldType =
+ parquetType2SeaTunnelType(elementType, name);
switch (fieldType.getSqlType()) {
case STRING:
return ArrayType.STRING_ARRAY_TYPE;
@@ -379,17 +390,12 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
case DOUBLE:
return ArrayType.DOUBLE_ARRAY_TYPE;
default:
- String errorMsg =
- String.format(
- "SeaTunnel array type not
supported this genericType [%s] yet",
- fieldType);
- throw new FileConnectorException(
-
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
+ throw CommonError.convertToSeaTunnelTypeError(
+ PARQUET, type.toString(), name);
}
default:
- throw new FileConnectorException(
-
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
- "SeaTunnel file connector not support this
nest type");
+ throw CommonError.convertToSeaTunnelTypeError(
+ PARQUET, type.toString(), name);
}
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index d3a210f56e..c5bdf28124 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -20,15 +20,23 @@ package
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
public interface ReadStrategy extends Serializable, Closeable {
void init(HadoopConf conf);
@@ -38,6 +46,11 @@ public interface ReadStrategy extends Serializable,
Closeable {
SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws
FileConnectorException;
+ default SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath,
String path)
+ throws FileConnectorException {
+ return getSeaTunnelRowTypeInfo(path);
+ }
+
default SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
String path, SeaTunnelRowType rowType) throws
FileConnectorException {
return getSeaTunnelRowTypeInfo(path);
@@ -53,4 +66,27 @@ public interface ReadStrategy extends Serializable,
Closeable {
// todo: use CatalogTable
SeaTunnelRowType getActualSeaTunnelRowTypeInfo();
+
+ default <T> void buildColumnsWithErrorCheck(
+ TablePath tablePath, Iterator<T> keys, Consumer<T> getDataType) {
+ Map<String, String> unsupported = new LinkedHashMap<>();
+ while (keys.hasNext()) {
+ try {
+ getDataType.accept(keys.next());
+ } catch (SeaTunnelRuntimeException e) {
+ if (e.getSeaTunnelErrorCode()
+
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
+ unsupported.put(e.getParams().get("field"),
e.getParams().get("dataType"));
+ } else {
+ throw e;
+ }
+ }
+ }
+ if (!unsupported.isEmpty()) {
+ throw CommonError.getCatalogTableWithUnsupportedType(
+ this.getClass().getSimpleName().replace("ReadStrategy",
""),
+ tablePath.getFullName(),
+ unsupported);
+ }
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 23ebc24093..66af39f18d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
@@ -189,6 +190,27 @@ public class ParquetReadStrategyTest {
AutoGenerateParquetData.deleteFile();
}
+ @DisabledOnOs(OS.WINDOWS)
+ @Test
+ public void testParquetReadUnsupportedType() throws Exception {
+ AutoGenerateParquetDataWithUnsupportedType.generateTestData();
+ ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ parquetReadStrategy.init(localConf);
+ SeaTunnelRuntimeException exception =
+ Assertions.assertThrows(
+ SeaTunnelRuntimeException.class,
+ () ->
+ parquetReadStrategy.getSeaTunnelRowTypeInfo(
+
AutoGenerateParquetDataWithUnsupportedType.DATA_FILE_PATH));
+ Assertions.assertEquals(
+ "ErrorCode:[COMMON-20], ErrorDescription:['Parquet' table
'default.default.default' unsupported get catalog table with field data types"
+ + " '{\"id\":\"required group id (LIST) {\\n repeated
group array (LIST) {\\n repeated binary array;\\n
}\\n}\",\"id2\":\"required group id2 (LIST) {\\n repeated group array (LIST)"
+ + " {\\n repeated binary array;\\n }\\n}\"}']",
+ exception.getMessage());
+ AutoGenerateParquetData.deleteFile();
+ }
+
public static class TestCollector implements Collector<SeaTunnelRow> {
private final List<SeaTunnelRow> rows = new ArrayList<>();
@@ -281,4 +303,45 @@ public class ParquetReadStrategyTest {
}
}
}
+
+ public static class AutoGenerateParquetDataWithUnsupportedType {
+
+ public static final String DATA_FILE_PATH =
"/tmp/data_unsupported.parquet";
+
+ public static void generateTestData() throws IOException {
+ deleteFile();
+ String schemaString =
+
"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":{\"type\":
\"array\", \"items\": {\"type\": \"array\", \"items\":
\"bytes\"}}},{\"name\":\"id2\",\"type\":{\"type\": \"array\", \"items\":
{\"type\": \"array\", \"items\":
\"bytes\"}}},{\"name\":\"long\",\"type\":\"long\"}]}";
+ Schema schema = new Schema.Parser().parse(schemaString);
+
+ Configuration conf = new Configuration();
+
+ Path file = new Path(DATA_FILE_PATH);
+
+ ParquetWriter<GenericRecord> writer =
+ AvroParquetWriter.<GenericRecord>builder(file)
+ .withSchema(schema)
+ .withConf(conf)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .build();
+
+ GenericRecord record1 = new GenericData.Record(schema);
+ GenericArray<GenericData.Array<Utf8>> id =
+ new GenericData.Array<>(2, schema.getField("id").schema());
+ id.add(new GenericData.Array<>(2,
schema.getField("id").schema().getElementType()));
+ id.add(new GenericData.Array<>(2,
schema.getField("id").schema().getElementType()));
+ record1.put("id", id);
+ record1.put("id2", id);
+ record1.put("long", Long.MAX_VALUE);
+ writer.write(record1);
+ writer.close();
+ }
+
+ public static void deleteFile() {
+ File parquetFile = new File(DATA_FILE_PATH);
+ if (parquetFile.exists()) {
+ parquetFile.delete();
+ }
+ }
+ }
}