This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6c65805699 [Improve] Improve read with parquet type convert error 
(#6683)
6c65805699 is described below

commit 6c658056993006bb5429e9af58107c29d4f864dd
Author: Jia Fan <[email protected]>
AuthorDate: Fri Apr 12 10:27:12 2024 +0800

    [Improve] Improve read with parquet type convert error (#6683)
---
 .../file/source/reader/ParquetReadStrategy.java    | 60 +++++++++++----------
 .../seatunnel/file/source/reader/ReadStrategy.java | 36 +++++++++++++
 .../file/writer/ParquetReadStrategyTest.java       | 63 ++++++++++++++++++++++
 3 files changed, 132 insertions(+), 27 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index cbc39b2f97..1264df9807 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
@@ -28,6 +29,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
@@ -67,6 +69,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
 
 @Slf4j
 public class ParquetReadStrategy extends AbstractReadStrategy {
@@ -75,6 +78,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy 
{
     private static final long NANOS_PER_MILLISECOND = 1000000;
     private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1L);
     private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
+    private static final String PARQUET = "Parquet";
 
     private int[] indexes;
 
@@ -234,6 +238,12 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws 
FileConnectorException {
+        return getSeaTunnelRowTypeInfo(TablePath.DEFAULT, path);
+    }
+
+    @Override
+    public SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath, 
String path)
+            throws FileConnectorException {
         ParquetMetadata metadata;
         try (ParquetFileReader reader =
                 hadoopFileSystemProxy.doWithHadoopAuth(
@@ -259,19 +269,22 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
         String[] fields = new String[readColumns.size()];
         SeaTunnelDataType<?>[] types = new 
SeaTunnelDataType[readColumns.size()];
         indexes = new int[readColumns.size()];
-        for (int i = 0; i < readColumns.size(); i++) {
-            fields[i] = readColumns.get(i);
-            Type type = originalSchema.getType(fields[i]);
-            int fieldIndex = originalSchema.getFieldIndex(fields[i]);
-            indexes[i] = fieldIndex;
-            types[i] = parquetType2SeaTunnelType(type);
-        }
+        buildColumnsWithErrorCheck(
+                tablePath,
+                IntStream.range(0, readColumns.size()).iterator(),
+                i -> {
+                    fields[i] = readColumns.get(i);
+                    Type type = originalSchema.getType(fields[i]);
+                    int fieldIndex = originalSchema.getFieldIndex(fields[i]);
+                    indexes[i] = fieldIndex;
+                    types[i] = parquetType2SeaTunnelType(type, fields[i]);
+                });
         seaTunnelRowType = new SeaTunnelRowType(fields, types);
         seaTunnelRowTypeWithPartition = mergePartitionTypes(path, 
seaTunnelRowType);
         return getActualSeaTunnelRowTypeInfo();
     }
 
-    private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
+    private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type, String 
name) {
         if (type.isPrimitive()) {
             switch (type.asPrimitiveType().getPrimitiveTypeName()) {
                 case INT32:
@@ -287,9 +300,8 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                         case DATE:
                             return LocalTimeType.LOCAL_DATE_TYPE;
                         default:
-                            String errorMsg = String.format("Not support this 
type [%s]", type);
-                            throw new FileConnectorException(
-                                    
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
+                            throw CommonError.convertToSeaTunnelTypeError(
+                                    PARQUET, type.toString(), name);
                     }
                 case INT64:
                     if (type.asPrimitiveType().getOriginalType() == 
OriginalType.TIMESTAMP_MILLIS) {
@@ -324,9 +336,7 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                     int scale = Integer.parseInt(splits[1]);
                     return new DecimalType(precision, scale);
                 default:
-                    String errorMsg = String.format("Not support this type 
[%s]", type);
-                    throw new FileConnectorException(
-                            CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, 
errorMsg);
+                    throw CommonError.convertToSeaTunnelTypeError("Parquet", 
type.toString(), name);
             }
         } else {
             LogicalTypeAnnotation logicalTypeAnnotation =
@@ -339,7 +349,7 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                 for (int i = 0; i < fields.size(); i++) {
                     Type fieldType = fields.get(i);
                     SeaTunnelDataType<?> seaTunnelDataType =
-                            parquetType2SeaTunnelType(fields.get(i));
+                            parquetType2SeaTunnelType(fields.get(i), name);
                     fieldNames[i] = fieldType.getName();
                     seaTunnelDataTypes[i] = seaTunnelDataType;
                 }
@@ -349,9 +359,9 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                     case MAP:
                         GroupType groupType = 
type.asGroupType().getType(0).asGroupType();
                         SeaTunnelDataType<?> keyType =
-                                
parquetType2SeaTunnelType(groupType.getType(0));
+                                
parquetType2SeaTunnelType(groupType.getType(0), name);
                         SeaTunnelDataType<?> valueType =
-                                
parquetType2SeaTunnelType(groupType.getType(1));
+                                
parquetType2SeaTunnelType(groupType.getType(1), name);
                         return new MapType<>(keyType, valueType);
                     case LIST:
                         Type elementType;
@@ -360,7 +370,8 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                         } catch (Exception e) {
                             elementType = type.asGroupType().getType(0);
                         }
-                        SeaTunnelDataType<?> fieldType = 
parquetType2SeaTunnelType(elementType);
+                        SeaTunnelDataType<?> fieldType =
+                                parquetType2SeaTunnelType(elementType, name);
                         switch (fieldType.getSqlType()) {
                             case STRING:
                                 return ArrayType.STRING_ARRAY_TYPE;
@@ -379,17 +390,12 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                             case DOUBLE:
                                 return ArrayType.DOUBLE_ARRAY_TYPE;
                             default:
-                                String errorMsg =
-                                        String.format(
-                                                "SeaTunnel array type not 
supported this genericType [%s] yet",
-                                                fieldType);
-                                throw new FileConnectorException(
-                                        
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
+                                throw CommonError.convertToSeaTunnelTypeError(
+                                        PARQUET, type.toString(), name);
                         }
                     default:
-                        throw new FileConnectorException(
-                                
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
-                                "SeaTunnel file connector not support this 
nest type");
+                        throw CommonError.convertToSeaTunnelTypeError(
+                                PARQUET, type.toString(), name);
                 }
             }
         }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index d3a210f56e..c5bdf28124 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -20,15 +20,23 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
 
 public interface ReadStrategy extends Serializable, Closeable {
     void init(HadoopConf conf);
@@ -38,6 +46,11 @@ public interface ReadStrategy extends Serializable, 
Closeable {
 
     SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws 
FileConnectorException;
 
+    default SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath, 
String path)
+            throws FileConnectorException {
+        return getSeaTunnelRowTypeInfo(path);
+    }
+
     default SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
             String path, SeaTunnelRowType rowType) throws 
FileConnectorException {
         return getSeaTunnelRowTypeInfo(path);
@@ -53,4 +66,27 @@ public interface ReadStrategy extends Serializable, 
Closeable {
 
     // todo: use CatalogTable
     SeaTunnelRowType getActualSeaTunnelRowTypeInfo();
+
+    default <T> void buildColumnsWithErrorCheck(
+            TablePath tablePath, Iterator<T> keys, Consumer<T> getDataType) {
+        Map<String, String> unsupported = new LinkedHashMap<>();
+        while (keys.hasNext()) {
+            try {
+                getDataType.accept(keys.next());
+            } catch (SeaTunnelRuntimeException e) {
+                if (e.getSeaTunnelErrorCode()
+                        
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
+                    unsupported.put(e.getParams().get("field"), 
e.getParams().get("dataType"));
+                } else {
+                    throw e;
+                }
+            }
+        }
+        if (!unsupported.isEmpty()) {
+            throw CommonError.getCatalogTableWithUnsupportedType(
+                    this.getClass().getSimpleName().replace("ReadStrategy", 
""),
+                    tablePath.getFullName(),
+                    unsupported);
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 23ebc24093..66af39f18d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
 
@@ -189,6 +190,27 @@ public class ParquetReadStrategyTest {
         AutoGenerateParquetData.deleteFile();
     }
 
+    @DisabledOnOs(OS.WINDOWS)
+    @Test
+    public void testParquetReadUnsupportedType() throws Exception {
+        AutoGenerateParquetDataWithUnsupportedType.generateTestData();
+        ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        parquetReadStrategy.init(localConf);
+        SeaTunnelRuntimeException exception =
+                Assertions.assertThrows(
+                        SeaTunnelRuntimeException.class,
+                        () ->
+                                parquetReadStrategy.getSeaTunnelRowTypeInfo(
+                                        
AutoGenerateParquetDataWithUnsupportedType.DATA_FILE_PATH));
+        Assertions.assertEquals(
+                "ErrorCode:[COMMON-20], ErrorDescription:['Parquet' table 
'default.default.default' unsupported get catalog table with field data types"
+                        + " '{\"id\":\"required group id (LIST) {\\n  repeated 
group array (LIST) {\\n    repeated binary array;\\n  
}\\n}\",\"id2\":\"required group id2 (LIST) {\\n  repeated group array (LIST)"
+                        + " {\\n    repeated binary array;\\n  }\\n}\"}']",
+                exception.getMessage());
+        AutoGenerateParquetData.deleteFile();
+    }
+
     public static class TestCollector implements Collector<SeaTunnelRow> {
 
         private final List<SeaTunnelRow> rows = new ArrayList<>();
@@ -281,4 +303,45 @@ public class ParquetReadStrategyTest {
             }
         }
     }
+
+    public static class AutoGenerateParquetDataWithUnsupportedType {
+
+        public static final String DATA_FILE_PATH = 
"/tmp/data_unsupported.parquet";
+
+        public static void generateTestData() throws IOException {
+            deleteFile();
+            String schemaString =
+                    
"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":{\"type\":
 \"array\", \"items\": {\"type\": \"array\", \"items\": 
\"bytes\"}}},{\"name\":\"id2\",\"type\":{\"type\": \"array\", \"items\": 
{\"type\": \"array\", \"items\": 
\"bytes\"}}},{\"name\":\"long\",\"type\":\"long\"}]}";
+            Schema schema = new Schema.Parser().parse(schemaString);
+
+            Configuration conf = new Configuration();
+
+            Path file = new Path(DATA_FILE_PATH);
+
+            ParquetWriter<GenericRecord> writer =
+                    AvroParquetWriter.<GenericRecord>builder(file)
+                            .withSchema(schema)
+                            .withConf(conf)
+                            .withCompressionCodec(CompressionCodecName.SNAPPY)
+                            .build();
+
+            GenericRecord record1 = new GenericData.Record(schema);
+            GenericArray<GenericData.Array<Utf8>> id =
+                    new GenericData.Array<>(2, schema.getField("id").schema());
+            id.add(new GenericData.Array<>(2, 
schema.getField("id").schema().getElementType()));
+            id.add(new GenericData.Array<>(2, 
schema.getField("id").schema().getElementType()));
+            record1.put("id", id);
+            record1.put("id2", id);
+            record1.put("long", Long.MAX_VALUE);
+            writer.write(record1);
+            writer.close();
+        }
+
+        public static void deleteFile() {
+            File parquetFile = new File(DATA_FILE_PATH);
+            if (parquetFile.exists()) {
+                parquetFile.delete();
+            }
+        }
+    }
 }

Reply via email to