This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new a76aebc005e [fix](multi-catalog)add the max compute fe ut and fix 
download expired  #27007 (#27220)
a76aebc005e is described below

commit a76aebc005ee78c1108a2a143d6b0f59ab58b258
Author: slothever <[email protected]>
AuthorDate: Mon Nov 20 10:42:28 2023 +0800

    [fix](multi-catalog)add the max compute fe ut and fix download expired  
#27007 (#27220)
    
    bp #27007
---
 fe/be-java-extensions/max-compute-scanner/pom.xml  |   9 ++
 .../doris/maxcompute/MaxComputeJniScanner.java     |  41 ++++--
 .../doris/maxcompute/MaxComputeJniScannerTest.java | 158 +++++++++++++++++++++
 .../test_external_catalog_maxcompute.out           |   3 +
 .../test_external_catalog_maxcompute.groovy        |  17 +++
 5 files changed, 217 insertions(+), 11 deletions(-)

diff --git a/fe/be-java-extensions/max-compute-scanner/pom.xml 
b/fe/be-java-extensions/max-compute-scanner/pom.xml
index 8e057da8074..9f9fa50e972 100644
--- a/fe/be-java-extensions/max-compute-scanner/pom.xml
+++ b/fe/be-java-extensions/max-compute-scanner/pom.xml
@@ -98,6 +98,15 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <forkCount>${fe_ut_parallel}</forkCount>
+                    <reuseForks>false</reuseForks>
+                    <useFile>false</useFile>
+                    
<argLine>-javaagent:${settings.localRepository}/org/jmockit/jmockit/${jmockit.version}/jmockit-${jmockit.version}.jar</argLine>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 
diff --git 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index 5f4125ec4ed..0d80546cdfb 100644
--- 
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++ 
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -26,6 +26,7 @@ import com.aliyun.odps.OdpsType;
 import com.aliyun.odps.PartitionSpec;
 import com.aliyun.odps.data.ArrowRecordReader;
 import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
 import com.aliyun.odps.type.TypeInfo;
 import com.aliyun.odps.type.TypeInfoFactory;
 import com.google.common.base.Strings;
@@ -60,27 +61,30 @@ public class MaxComputeJniScanner extends JniScanner {
     private static final String START_OFFSET = "start_offset";
     private static final String SPLIT_SIZE = "split_size";
     private static final String PUBLIC_ACCESS = "public_access";
+    private final RootAllocator arrowAllocator = new 
RootAllocator(Integer.MAX_VALUE);
     private final Map<String, MaxComputeTableScan> tableScans = new 
ConcurrentHashMap<>();
     private final String region;
     private final String project;
     private final String table;
     private PartitionSpec partitionSpec;
     private Set<String> partitionColumns;
-    private final MaxComputeTableScan curTableScan;
+    private MaxComputeTableScan curTableScan;
     private MaxComputeColumnValue columnValue;
     private long remainBatchRows = 0;
     private long totalRows = 0;
-    private RootAllocator arrowAllocator;
     private ArrowRecordReader curReader;
     private List<Column> readColumns;
     private Map<String, Integer> readColumnsToId;
     private long startOffset = -1L;
+    private int retryCount = 2;
     private long splitSize = -1L;
+    private final Map<String, String> refreshParams;
 
     public MaxComputeJniScanner(int batchSize, Map<String, String> params) {
         region = Objects.requireNonNull(params.get(REGION), "required property 
'" + REGION + "'.");
         project = Objects.requireNonNull(params.get(PROJECT), "required 
property '" + PROJECT + "'.");
         table = Objects.requireNonNull(params.get(TABLE), "required property 
'" + TABLE + "'.");
+        refreshParams = params;
         tableScans.putIfAbsent(tableUniqKey(), newTableScan(params));
         curTableScan = tableScans.get(tableUniqKey());
         String partitionSpec = params.get(PARTITION_SPEC);
@@ -104,6 +108,11 @@ public class MaxComputeJniScanner extends JniScanner {
         initTableInfo(columnTypes, requiredFields, predicates, batchSize);
     }
 
+    public void refreshTableScan() {
+        curTableScan = newTableScan(refreshParams);
+        tableScans.put(tableUniqKey(), curTableScan);
+    }
+
     private MaxComputeTableScan newTableScan(Map<String, String> params) {
         if (!Strings.isNullOrEmpty(params.get(START_OFFSET))
                 && !Strings.isNullOrEmpty(params.get(SPLIT_SIZE))) {
@@ -132,6 +141,10 @@ public class MaxComputeJniScanner extends JniScanner {
                 readColumnsToId.put(fields[i], i);
             }
         }
+    }
+
+    @Override
+    public void open() throws IOException {
         // reorder columns
         List<Column> columnList = curTableScan.getSchema().getColumns();
         columnList.addAll(curTableScan.getSchema().getPartitionColumns());
@@ -142,10 +155,6 @@ public class MaxComputeJniScanner extends JniScanner {
         // Downloading columns data from Max compute only supports the order 
of table metadata.
         // We might get an error message if no sort here: Column reorder is 
not supported in legacy arrow mode.
         readColumns.sort((Comparator.comparing(o -> 
columnRank.get(o.getName()))));
-    }
-
-    @Override
-    public void open() throws IOException {
         if (readColumns.isEmpty()) {
             return;
         }
@@ -159,19 +168,27 @@ public class MaxComputeJniScanner extends JniScanner {
             long start = startOffset == -1L ? 0 : startOffset;
             long recordCount = session.getRecordCount();
             totalRows = splitSize > 0 ? Math.min(splitSize, recordCount) : 
recordCount;
-
-            arrowAllocator = new RootAllocator(Long.MAX_VALUE);
             partitionColumns = 
session.getSchema().getPartitionColumns().stream()
                     .map(Column::getName)
                     .collect(Collectors.toSet());
             List<Column> maxComputeColumns = new ArrayList<>(readColumns);
             maxComputeColumns.removeIf(e -> 
partitionColumns.contains(e.getName()));
             curReader = session.openArrowRecordReader(start, totalRows, 
maxComputeColumns, arrowAllocator);
+            remainBatchRows = totalRows;
+        } catch (TunnelException e) {
+            if (retryCount > 0 && e.getErrorMsg().contains("TableModified")) {
+                retryCount--;
+                // try to refresh table scan and re-open odps
+                refreshTableScan();
+                open();
+            } else {
+                retryCount = 2;
+                throw new IOException(e);
+            }
         } catch (Exception e) {
             close();
             throw new IOException(e);
         }
-        remainBatchRows = totalRows;
     }
 
     private Column createOdpsColumn(int colIdx, ColumnType dorisType) {
@@ -237,7 +254,7 @@ public class MaxComputeJniScanner extends JniScanner {
         startOffset = -1;
         splitSize = -1;
         if (curReader != null) {
-            arrowAllocator.close();
+            arrowAllocator.releaseBytes(arrowAllocator.getAllocatedMemory());
             curReader.close();
             curReader = null;
         }
@@ -279,7 +296,9 @@ public class MaxComputeJniScanner extends JniScanner {
                         Integer readColumnId = 
readColumnsToId.get(partitionColumn);
                         if (readColumnId != null && partitionValue != null) {
                             MaxComputePartitionValue value = new 
MaxComputePartitionValue(partitionValue);
-                            appendData(readColumnId, value);
+                            for (int i = 0; i < batchRows; i++) {
+                                appendData(readColumnId, value);
+                            }
                         }
                     }
                 }
diff --git 
a/fe/be-java-extensions/max-compute-scanner/src/test/java/org/apache/doris/maxcompute/MaxComputeJniScannerTest.java
 
b/fe/be-java-extensions/max-compute-scanner/src/test/java/org/apache/doris/maxcompute/MaxComputeJniScannerTest.java
new file mode 100644
index 00000000000..f14c0610d43
--- /dev/null
+++ 
b/fe/be-java-extensions/max-compute-scanner/src/test/java/org/apache/doris/maxcompute/MaxComputeJniScannerTest.java
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.maxcompute;
+
+import com.aliyun.odps.Column;
+import com.aliyun.odps.PartitionSpec;
+import com.aliyun.odps.TableSchema;
+import com.aliyun.odps.data.ArrowRecordReader;
+import com.aliyun.odps.tunnel.TableTunnel;
+import com.aliyun.odps.tunnel.TunnelException;
+import com.aliyun.odps.type.TypeInfoFactory;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import org.apache.arrow.memory.BufferAllocator;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MaxComputeJniScannerTest {
+
+    @Mocked
+    private TableTunnel.DownloadSession session;
+    private Map<String, String> paramsMc = new HashMap<String, String>() {
+        {
+            put("region", "cn-beijing");
+            put("project", "test_pj");
+            put("table", "test_tb");
+            put("access_key", "ak");
+            put("secret_key", "sk");
+            put("start_offset", "0");
+            put("split_size", "128");
+            put("partition_spec", "p1=2022-06");
+            put("required_fields", 
"boolean,tinyint,smallint,int,bigint,float,double,"
+                    + "date,timestamp,char,varchar,string,decimalv2,decimal64,"
+                    + "decimal18,timestamp4");
+            put("columns_types", 
"boolean#tinyint#smallint#int#bigint#float#double#"
+                    + 
"date#timestamp#char(10)#varchar(10)#string#decimalv2(12,4)#decimal64(10,3)#"
+                    + "decimal(18,5)#timestamp(4)");
+        }
+    };
+    private MaxComputeJniScanner scanner = new MaxComputeJniScanner(32, 
paramsMc);
+
+    @BeforeEach
+    public void init() {
+        new MockUp<MaxComputeJniScanner>(MaxComputeJniScanner.class) {
+            @Mock
+            public TableSchema getSchema() {
+                return getTestSchema();
+            }
+        };
+        new MockUp<MaxComputeTableScan>(MaxComputeTableScan.class) {
+            @Mock
+            public TableSchema getSchema() {
+                return getTestSchema();
+            }
+
+            @Mock
+            public TableTunnel.DownloadSession openDownLoadSession() throws 
IOException {
+                return session;
+            }
+
+            @Mock
+            public TableTunnel.DownloadSession 
openDownLoadSession(PartitionSpec partitionSpec) throws IOException {
+                return session;
+            }
+        };
+        new 
MockUp<TableTunnel.DownloadSession>(TableTunnel.DownloadSession.class) {
+            @Mock
+            public TableSchema getSchema() {
+                return getTestSchema();
+            }
+
+            @Mock
+            public long getRecordCount() {
+                return 10;
+            }
+
+            @Mock
+            public ArrowRecordReader openArrowRecordReader(long start, long 
count, List<Column> columns,
+                                                           BufferAllocator 
allocator)
+                    throws TunnelException, IOException {
+                return null;
+            }
+        };
+    }
+
+    private TableSchema getTestSchema() {
+        TableSchema schema = new TableSchema();
+        schema.addColumn(new Column("boolean", TypeInfoFactory.BOOLEAN));
+        schema.addColumn(new Column("bigint", TypeInfoFactory.BIGINT));
+        schema.addPartitionColumn(new Column("date", TypeInfoFactory.DATE));
+        schema.addPartitionColumn(new Column("tinyint", 
TypeInfoFactory.TINYINT));
+        schema.addPartitionColumn(new Column("smallint", 
TypeInfoFactory.SMALLINT));
+        schema.addPartitionColumn(new Column("int", TypeInfoFactory.INT));
+        schema.addPartitionColumn(new Column("timestamp", 
TypeInfoFactory.TIMESTAMP));
+        schema.addPartitionColumn(new Column("char", 
TypeInfoFactory.getCharTypeInfo(10)));
+        schema.addPartitionColumn(new Column("varchar", 
TypeInfoFactory.getVarcharTypeInfo(10)));
+        schema.addPartitionColumn(new Column("string", 
TypeInfoFactory.STRING));
+        schema.addPartitionColumn(new Column("float", TypeInfoFactory.FLOAT));
+        schema.addPartitionColumn(new Column("double", 
TypeInfoFactory.DOUBLE));
+        schema.addPartitionColumn(new Column("decimalv2",
+                TypeInfoFactory.getDecimalTypeInfo(12, 4)));
+        schema.addPartitionColumn(new Column("decimal64",
+                TypeInfoFactory.getDecimalTypeInfo(10, 3)));
+        schema.addPartitionColumn(new Column("decimal18",
+                TypeInfoFactory.getDecimalTypeInfo(18, 5)));
+        schema.addPartitionColumn(new Column("timestamp4", 
TypeInfoFactory.TIMESTAMP));
+        return schema;
+    }
+
+    @Test
+    public void testMaxComputeJniScanner() throws IOException {
+        scanner.open();
+        scanner.getNext();
+        scanner.close();
+    }
+
+    @Test
+    public void testMaxComputeJniScannerErr() {
+        try {
+            new 
MockUp<TableTunnel.DownloadSession>(TableTunnel.DownloadSession.class) {
+                @Mock
+                public ArrowRecordReader openArrowRecordReader(long start, 
long count, List<Column> columns,
+                                                               BufferAllocator 
allocator)
+                        throws TunnelException, IOException {
+                    throw new TunnelException("TableModified");
+                }
+            };
+            scanner.open();
+            scanner.getNext();
+            scanner.close();
+        } catch (IOException e) {
+            Assertions.assertTrue(e.getCause() instanceof TunnelException);
+            Assertions.assertEquals(((TunnelException) 
e.getCause()).getErrorMsg(), "TableModified");
+        }
+    }
+}
diff --git 
a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
 
b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
index 5fc7ade4894..6cd91cf2ee3 100644
--- 
a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
+++ 
b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out
@@ -22,3 +22,6 @@ true  77      8920    182239402452
 -- !q7 --
 6223   maxam   2020-09-21
 9601   qewtoll 2020-09-21
+
+-- !replay_q6 --
+9601   qewtoll 2020-09-21
diff --git 
a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
 
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
index 6b050e277a8..23d0c0b252d 100644
--- 
a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
+++ 
b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy
@@ -51,10 +51,27 @@ suite("test_external_catalog_maxcompute", 
"p2,external,maxcompute,external_remot
             qt_q6 """ select * from mc_parts where dt = '2020-09-21' and 
mc_bigint > 6223 """
             qt_q7 """ select * from mc_parts where dt = '2020-09-21' or 
mc_bigint > 0 """
         }
+
         sql """ switch `${mc_catalog_name}`; """
         sql """ use `${mc_db}`; """
         q01()
         q02()
         q03()
+
+        // replay test
+        sql """drop catalog if exists ${mc_catalog_name};"""
+        sql """
+            create catalog if not exists ${mc_catalog_name} properties (
+                "type" = "max_compute",
+                "mc.region" = "cn-beijing",
+                "mc.default.project" = "${mc_db}",
+                "mc.access_key" = "${ak}",
+                "mc.secret_key" = "${sk}",
+                "mc.public_access" = "true"
+            );
+        """
+        sql """ switch `${mc_catalog_name}`; """
+        sql """ use `${mc_db}`; """
+        qt_replay_q6 """ select * from mc_parts where dt = '2020-09-21' and 
mc_bigint > 6223 """
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to