This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e24843515 [Improve][Connector-V2][Iceberg] Unified exception for 
iceberg source connector (#3677)
e24843515 is described below

commit e24843515ffdbcd58a54f1306987cf36d4bb370d
Author: ChunFuWu <[email protected]>
AuthorDate: Wed Jan 11 22:15:22 2023 +0800

    [Improve][Connector-V2][Iceberg] Unified exception for iceberg source 
connector (#3677)
    
    * [Improve][Connector-V2][Iceberg] Unified exception for iceberg source 
connector
    
    * [Improve][Connector-V2][Iceberg] Unified exception for iceberg source 
connector
    
    * [Improve][Connector-V2][Iceberg] Unified exception for iceberg source 
connector
    
    Co-authored-by: Tyrantlucifer <[email protected]>
---
 .../connector-v2/Error-Quick-Reference-Manual.md   |  8 ++++
 .../seatunnel/iceberg/IcebergCatalogFactory.java   |  5 ++-
 .../iceberg/data/DefaultDeserializer.java          |  6 ++-
 .../seatunnel/iceberg/data/IcebergTypeMapper.java  | 11 ++++--
 .../exception/IcebergConnectorErrorCode.java       | 44 ++++++++++++++++++++++
 .../exception/IcebergConnectorException.java       | 36 ++++++++++++++++++
 .../enumerator/scan/IcebergScanSplitPlanner.java   | 13 +++++--
 .../source/reader/IcebergFileScanTaskReader.java   | 12 ++++--
 .../reader/IcebergFileScanTaskSplitReader.java     | 10 +++--
 9 files changed, 130 insertions(+), 15 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index b684779b9..37580705d 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -206,8 +206,16 @@ problems encountered by users.
 | DINGTALK-01 | Send response to DinkTalk server failed | When users encounter 
this error code, it means that send response message to DinkTalk server failed, 
please check it |
 | DINGTALK-02 | Get sign from DinkTalk server failed    | When users encounter 
this error code, it means that get signature from DinkTalk server failed , 
please check it      |
 
+## Iceberg Connector Error Codes
+
+| code       | description                     | solution                      
                                                                           |
+|------------|---------------------------------|----------------------------------------------------------------------------------------------------------|
+| ICEBERG-01 | File Scan Split failed          | When users encounter this 
error code, it means that the file scanning and splitting failed. Please check |
+| ICEBERG-02 | Invalid starting record offset  | When users encounter this 
error code, it means that the starting record offset is invalid. Please check  |
+
 ## Email Connector Error Codes
 
 | code        | description       | solution                                   
                                                                                
                                         |
 
|-------------|-------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
 | EMAIL-01    | Send email failed | When users encounter this error code, it 
means that send email to target server failed, please adjust the network 
environment according to the abnormal information |
+
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
index 480c80bd8..50039eb00 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.iceberg;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 
 import lombok.NonNull;
 import org.apache.hadoop.conf.Configuration;
@@ -64,7 +66,8 @@ public class IcebergCatalogFactory implements Serializable {
                 properties.put(CatalogProperties.URI, uri);
                 return hive(catalogName, serializableConf, properties);
             default:
-                throw new UnsupportedOperationException("Unsupported 
catalogType: " + catalogType);
+                throw new 
IcebergConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                    String.format("Unsupported catalogType: %s", catalogType));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java
index 7ae641d14..00e25f18b 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/DefaultDeserializer.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 
 import lombok.AllArgsConstructor;
 import lombok.NonNull;
@@ -133,7 +135,9 @@ public class DefaultDeserializer implements Deserializer {
                 }
                 return seatunnelMap;
             default:
-                throw new UnsupportedOperationException("Unsupported iceberg 
type: " + icebergType);
+                throw new IcebergConnectorException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                    String.format("Unsupported iceberg type: %s", 
icebergType));
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
index 485ecaf88..b4893a86b 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/data/IcebergTypeMapper.java
@@ -25,6 +25,8 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 
 import lombok.NonNull;
 import org.apache.iceberg.types.Type;
@@ -68,7 +70,8 @@ public class IcebergTypeMapper {
             case MAP:
                 return mappingMapType((Types.MapType) icebergType);
             default:
-                throw new UnsupportedOperationException(
+                throw new IcebergConnectorException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE,
                     "Unsupported iceberg data type: " + icebergType.typeId());
         }
     }
@@ -100,8 +103,10 @@ public class IcebergTypeMapper {
             case STRING:
                 return ArrayType.STRING_ARRAY_TYPE;
             default:
-                throw new UnsupportedOperationException(
-                    "Unsupported iceberg list element type: " + 
listType.elementType().typeId());
+                throw new IcebergConnectorException(
+                    CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                    "Unsupported iceberg list element type: " +
+                        listType.elementType().typeId());
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorErrorCode.java
new file mode 100644
index 000000000..878162380
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorErrorCode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum IcebergConnectorErrorCode implements SeaTunnelErrorCode {
+
+    FILE_SCAN_SPLIT_FAILED("ICEBERG-01", "File Scan Split failed"),
+    INVALID_STARTING_RECORD_OFFSET("ICEBERG-02", "Invalid starting record 
offset");
+
+    private final String code;
+    private final String description;
+
+    IcebergConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getDescription() {
+        return this.description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorException.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorException.java
new file mode 100644
index 000000000..0279c46b3
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/exception/IcebergConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class IcebergConnectorException extends SeaTunnelRuntimeException {
+
+    public IcebergConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public IcebergConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public IcebergConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java
index f3d29f814..d41a5d813 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java
@@ -19,6 +19,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.scan
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumerationResult;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator.IcebergEnumeratorPosition;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
@@ -36,7 +39,6 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.util.SnapshotUtil;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -135,8 +137,10 @@ public class IcebergScanSplitPlanner {
                     return Optional.of(SnapshotUtil.snapshotAfter(table, 
snapshotIdAsOfTime));
                 }
             default:
-                throw new UnsupportedOperationException("Unsupported stream 
scan strategy: " +
-                    icebergScanContext.getStreamScanStrategy());
+                throw new IcebergConnectorException(
+                    CommonErrorCode.UNSUPPORTED_OPERATION,
+                    "Unsupported stream scan strategy: " +
+                        icebergScanContext.getStreamScanStrategy());
         }
     }
 
@@ -151,7 +155,8 @@ public class IcebergScanSplitPlanner {
             }
             return splits;
         } catch (IOException e) {
-            throw new UncheckedIOException(
+            throw new IcebergConnectorException(
+                IcebergConnectorErrorCode.FILE_SCAN_SPLIT_FAILED,
                 "Failed to scan iceberg splits from: " + table.name(), e);
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java
index a50706847..8f1ddd884 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskReader.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.data.IcebergRecordProjection;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 
 import com.google.common.collect.Sets;
 import lombok.Builder;
@@ -93,7 +95,9 @@ public class IcebergFileScanTaskReader implements Closeable {
 
     private CloseableIterable<Record> openFile(FileScanTask task, Schema 
fileProjection) {
         if (task.isDataTask()) {
-            throw new UnsupportedOperationException("Cannot read data task.");
+            throw new IcebergConnectorException(
+                CommonErrorCode.UNSUPPORTED_OPERATION,
+                "Cannot read data task.");
         }
         InputFile input = fileIO.newInputFile(task.file().path().toString());
         Map<Integer, ?> partition = PartitionUtil.constantsMap(task, 
IdentityPartitionConverters::convertConstant);
@@ -131,8 +135,10 @@ public class IcebergFileScanTaskReader implements 
Closeable {
                     .filter(task.residual());
                 return orc.build();
             default:
-                throw new UnsupportedOperationException(String.format("Cannot 
read %s file: %s",
-                    task.file().format().name(), task.file().path()));
+                throw new IcebergConnectorException(
+                    CommonErrorCode.UNSUPPORTED_OPERATION,
+                    String.format("Cannot read %s file: %s",
+                        task.file().format().name(), task.file().path()));
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java
index c170dbd90..6667fc897 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/reader/IcebergFileScanTaskSplitReader.java
@@ -19,6 +19,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.iceberg.source.reader;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.data.Deserializer;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
 
 import lombok.AllArgsConstructor;
@@ -38,7 +40,7 @@ public class IcebergFileScanTaskSplitReader implements 
Closeable {
     public CloseableIterator<SeaTunnelRow> open(@NonNull 
IcebergFileScanTaskSplit split) {
         CloseableIterator<Record> iterator = 
icebergFileScanTaskReader.open(split.getTask());
 
-        OffsetSeekIterator<Record> seekIterator = new 
OffsetSeekIterator(iterator);
+        OffsetSeekIterator<Record> seekIterator = new 
OffsetSeekIterator<>(iterator);
         seekIterator.seek(split.getRecordOffset());
 
         return CloseableIterator.transform(seekIterator, record -> {
@@ -62,8 +64,10 @@ public class IcebergFileScanTaskSplitReader implements 
Closeable {
                 if (hasNext()) {
                     next();
                 } else {
-                    throw new IllegalStateException(String.format(
-                        "Invalid starting record offset %d", 
startingRecordOffset));
+                    throw new IcebergConnectorException(
+                        
IcebergConnectorErrorCode.INVALID_STARTING_RECORD_OFFSET,
+                        String.format(
+                            "Invalid starting record offset %d", 
startingRecordOffset));
                 }
             }
         }

Reply via email to