This is an automated email from the ASF dual-hosted git repository.

ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new db69457576 [fix](avro)Fix S3 TVF avro format reading failure (#22199)
db69457576 is described below

commit db69457576f9cef089dab56c5dd93b76a631dd7a
Author: DongLiang-0 <[email protected]>
AuthorDate: Fri Aug 11 17:22:48 2023 +0800

    [fix](avro)Fix S3 TVF avro format reading failure (#22199)
    
    This pr fixes two issues:
    
    1. when using s3 TVF to query files in AVRO format, due to the change of 
`TFileType`, the originally queried `FILE_S3 ` becomes `FILE_LOCAL`, causing 
the query failed.
    2. currently, both parameters `s3.virtual.key` and `s3.virtual.bucket` are 
removed. A new `S3Utils`  in jni-avro to parse the bucket and key of s3.
    The purpose of doing this operation is mainly to unify the parameters of s3.
---
 be/src/vec/exec/format/avro/avro_jni_reader.cpp    |  31 ++++--
 be/src/vec/exec/format/avro/avro_jni_reader.h      |   4 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             |   4 +-
 .../java/org/apache/doris/avro/AvroJNIScanner.java |   4 +-
 .../java/org/apache/doris/avro/AvroProperties.java |   2 -
 .../java/org/apache/doris/avro/HDFSFileReader.java |   9 +-
 .../java/org/apache/doris/avro/S3FileReader.java   |  20 ++--
 .../main/java/org/apache/doris/avro/S3Utils.java   | 109 +++++++++++++++++++++
 .../property/constants/S3Properties.java           |   2 -
 .../doris/tablefunction/S3TableValuedFunction.java |  11 +--
 .../datasource/property/PropertyConverterTest.java |   4 +-
 11 files changed, 160 insertions(+), 40 deletions(-)

diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp 
b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
index ffca8682c4..5a347a0e59 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
@@ -27,8 +27,13 @@ namespace doris::vectorized {
 
 AvroJNIReader::AvroJNIReader(RuntimeState* state, RuntimeProfile* profile,
                              const TFileScanRangeParams& params,
-                             const std::vector<SlotDescriptor*>& 
file_slot_descs)
-        : _file_slot_descs(file_slot_descs), _state(state), _profile(profile), 
_params(params) {}
+                             const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                             const TFileRangeDesc& range)
+        : _file_slot_descs(file_slot_descs),
+          _state(state),
+          _profile(profile),
+          _params(params),
+          _range(range) {}
 
 AvroJNIReader::AvroJNIReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
                              const TFileRangeDesc& range,
@@ -74,13 +79,7 @@ Status AvroJNIReader::init_fetch_table_reader(
         index++;
     }
 
-    TFileType::type type;
-    if (_range.__isset.file_type) {
-        // for compatibility
-        type = _range.file_type;
-    } else {
-        type = _params.file_type;
-    }
+    TFileType::type type = get_file_type();
     std::map<String, String> required_param = {
             {"required_fields", required_fields.str()},
             {"columns_types", columns_types.str()},
@@ -92,6 +91,7 @@ Status AvroJNIReader::init_fetch_table_reader(
         required_param.insert(std::make_pair("uri", 
_params.hdfs_params.hdfs_conf.data()->value));
         break;
     case TFileType::FILE_S3:
+        required_param.insert(std::make_pair("uri", _range.path));
         required_param.insert(_params.properties.begin(), 
_params.properties.end());
         break;
     default:
@@ -104,9 +104,20 @@ Status AvroJNIReader::init_fetch_table_reader(
     return _jni_connector->open(_state, _profile);
 }
 
+TFileType::type AvroJNIReader::get_file_type() {
+    TFileType::type type;
+    if (_range.__isset.file_type) {
+        // for compatibility
+        type = _range.file_type;
+    } else {
+        type = _params.file_type;
+    }
+    return type;
+}
+
 Status AvroJNIReader::init_fetch_table_schema_reader() {
     std::map<String, String> required_param = {{"uri", _range.path},
-                                               {"file_type", 
std::to_string(_params.file_type)},
+                                               {"file_type", 
std::to_string(get_file_type())},
                                                {"is_get_table_schema", 
"true"}};
 
     required_param.insert(_params.properties.begin(), 
_params.properties.end());
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.h 
b/be/src/vec/exec/format/avro/avro_jni_reader.h
index 70ef859fba..8f8d9f6683 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.h
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.h
@@ -56,7 +56,7 @@ public:
      * Call java side by jni to get table data.
      */
     AvroJNIReader(RuntimeState* state, RuntimeProfile* profile, const 
TFileScanRangeParams& params,
-                  const std::vector<SlotDescriptor*>& file_slot_descs);
+                  const std::vector<SlotDescriptor*>& file_slot_descs, const 
TFileRangeDesc& range);
 
     /**
      * Call java side by jni to get table schema.
@@ -74,6 +74,8 @@ public:
     Status init_fetch_table_reader(
             std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
+    TFileType::type get_file_type();
+
     Status init_fetch_table_schema_reader();
 
     Status get_parsed_schema(std::vector<std::string>* col_names,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 513c384930..6365f60ed6 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -775,8 +775,8 @@ Status VFileScanner::_get_next_reader() {
             break;
         }
         case TFileFormatType::FORMAT_AVRO: {
-            _cur_reader =
-                    AvroJNIReader::create_unique(_state, _profile, *_params, 
_file_slot_descs);
+            _cur_reader = AvroJNIReader::create_unique(_state, _profile, 
*_params, _file_slot_descs,
+                                                       range);
             init_status = ((AvroJNIReader*)(_cur_reader.get()))
                                   
->init_fetch_table_reader(_colname_to_value_range);
             break;
diff --git 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
index d3e1cad579..11bce610d7 100644
--- 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
+++ 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
@@ -146,13 +146,11 @@ public class AvroJNIScanner extends JniScanner {
                 this.avroReader = new HDFSFileReader(uri);
                 break;
             case FILE_S3:
-                String bucketName = 
requiredParams.get(AvroProperties.S3_BUCKET);
-                String key = requiredParams.get(AvroProperties.S3_KEY);
                 String accessKey = 
requiredParams.get(AvroProperties.S3_ACCESS_KEY);
                 String secretKey = 
requiredParams.get(AvroProperties.S3_SECRET_KEY);
                 String endpoint = 
requiredParams.get(AvroProperties.S3_ENDPOINT);
                 String region = requiredParams.get(AvroProperties.S3_REGION);
-                this.avroReader = new S3FileReader(accessKey, secretKey, 
endpoint, region, bucketName, key);
+                this.avroReader = new S3FileReader(accessKey, secretKey, 
endpoint, region, uri);
                 break;
             default:
                 LOG.warn("Unsupported " + fileType.name() + " file type.");
diff --git 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java
 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java
index 11066b5e08..6619b6888c 100644
--- 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java
+++ 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java
@@ -27,8 +27,6 @@ public class AvroProperties {
     protected static final String REQUIRED_FIELDS = "required_fields";
     protected static final String FILE_TYPE = "file_type";
     protected static final String URI = "uri";
-    protected static final String S3_BUCKET = "s3.virtual.bucket";
-    protected static final String S3_KEY = "s3.virtual.key";
     protected static final String S3_ACCESS_KEY = "s3.access_key";
     protected static final String S3_SECRET_KEY = "s3.secret_key";
     protected static final String S3_ENDPOINT = "s3.endpoint";
diff --git 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java
 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java
index 7200b4dde6..8c18970402 100644
--- 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java
+++ 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java
@@ -30,6 +30,7 @@ import org.apache.log4j.Logger;
 import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Objects;
 
 public class HDFSFileReader implements AvroReader {
     private static final Logger LOG = 
LogManager.getLogger(HDFSFileReader.class);
@@ -67,7 +68,11 @@ public class HDFSFileReader implements AvroReader {
 
     @Override
     public void close() throws IOException {
-        inputStream.close();
-        reader.close();
+        if (Objects.nonNull(inputStream)) {
+            inputStream.close();
+        }
+        if (Objects.nonNull(reader)) {
+            reader.close();
+        }
     }
 }
diff --git 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java
 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java
index bd966eb31c..4b1b4a864c 100644
--- 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java
+++ 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java
@@ -35,6 +35,7 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Objects;
 
 public class S3FileReader implements AvroReader {
 
@@ -48,13 +49,14 @@ public class S3FileReader implements AvroReader {
     private final String endpoint;
     private final String region;
 
-    public S3FileReader(String accessKey, String secretKey, String endpoint, 
String region,
-            String bucketName, String key) {
-        this.bucketName = bucketName;
-        this.key = key;
+    public S3FileReader(String accessKey, String secretKey, String endpoint, 
String region, String uri)
+            throws IOException {
         this.endpoint = endpoint;
         this.region = region;
-        credentials = new BasicAWSCredentials(accessKey, secretKey);
+        this.credentials = new BasicAWSCredentials(accessKey, secretKey);
+        S3Utils.parseURI(uri);
+        this.bucketName = S3Utils.getBucket();
+        this.key = S3Utils.getKey();
     }
 
     @Override
@@ -85,7 +87,11 @@ public class S3FileReader implements AvroReader {
 
     @Override
     public void close() throws IOException {
-        s3ObjectInputStream.close();
-        reader.close();
+        if (Objects.nonNull(s3ObjectInputStream)) {
+            s3ObjectInputStream.close();
+        }
+        if (Objects.nonNull(reader)) {
+            reader.close();
+        }
     }
 }
diff --git 
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3Utils.java
 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3Utils.java
new file mode 100644
index 0000000000..85ac4893cf
--- /dev/null
+++ 
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3Utils.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.avro;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+
+public class S3Utils {
+    private static final String SCHEMA_S3 = "s3";
+    private static final String SCHEMA_HTTP = "http";
+    private static final String SCHEMA_HTTPS = "https";
+    private static final String SCHEME_DELIM = "://";
+    private static final String PATH_DELIM = "/";
+    private static final String QUERY_DELIM = "\\?";
+    private static final String FRAGMENT_DELIM = "#";
+    private static String bucket;
+    private static String key;
+
+    /**
+     * eg:
+     * s3:     s3://bucket1/path/to/file.txt
+     * http:   http://10.10.10.1:9000/bucket1/to/file.txt
+     * https:  https://10.10.10.1:9000/bucket1/to/file.txt
+     * <p>
+     * schema: s3,http,https
+     * bucket: bucket1
+     * key:    path/to/file.txt
+     */
+    public static void parseURI(String uri) throws IOException {
+        if (StringUtils.isEmpty(uri)) {
+            throw new IOException("s3 uri is empty.");
+        }
+        String[] schemeSplit = uri.split(SCHEME_DELIM);
+        String rest;
+        if (schemeSplit.length == 2) {
+            if (schemeSplit[0].equalsIgnoreCase(SCHEMA_S3)) {
+                // has scheme, eg: s3://bucket1/path/to/file.txt
+                rest = schemeSplit[1];
+                String[] authoritySplit = rest.split(PATH_DELIM, 2);
+                if (authoritySplit.length < 1) {
+                    throw new IOException("Invalid S3 URI. uri=" + uri);
+                }
+                bucket = authoritySplit[0];
+                // support s3://bucket1
+                key = authoritySplit.length == 1 ? "/" : authoritySplit[1];
+            } else if (schemeSplit[0].equalsIgnoreCase(SCHEMA_HTTP) || 
schemeSplit[0].equalsIgnoreCase(SCHEMA_HTTPS)) {
+                // has scheme, eg: http(s)://host/bucket1/path/to/file.txt
+                rest = schemeSplit[1];
+                String[] authoritySplit = rest.split(PATH_DELIM, 3);
+                if (authoritySplit.length != 3) {
+                    throw new IOException("Invalid S3 HTTP URI: uri=" + uri);
+                }
+                // authority_split[1] is host
+                bucket = authoritySplit[1];
+                key = authoritySplit[2];
+            } else {
+                throw new IOException("Invalid S3 HTTP URI: uri=" + uri);
+            }
+
+        } else if (schemeSplit.length == 1) {
+            // no scheme, eg: path/to/file.txt
+            bucket = ""; // unknown
+            key = uri;
+        } else {
+            throw new IOException("Invalid S3 URI. uri=" + uri);
+        }
+
+        key = key.trim();
+        if (StringUtils.isEmpty(key)) {
+            throw new IOException("Invalid S3 URI. uri=" + uri);
+        }
+        // Strip query and fragment if they exist
+        String[] querySplit = key.split(QUERY_DELIM);
+        String[] fragmentSplit = querySplit[0].split(FRAGMENT_DELIM);
+        key = fragmentSplit[0];
+    }
+
+    public static String getBucket() {
+        return bucket;
+    }
+
+    public static String getKey() {
+        return key;
+    }
+
+    public static void main(String[] args) throws IOException {
+        S3Utils.parseURI("https://10.10.10.1:9000/bucket1/path/person.avro";);
+        String bucket1 = S3Utils.getBucket();
+        String key1 = S3Utils.getKey();
+        System.out.println(bucket1 + "  " + key1);
+    }
+
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
index 3d3a003466..a1da49257b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
@@ -55,8 +55,6 @@ public class S3Properties extends BaseProperties {
     // required by storage policy
     public static final String ROOT_PATH = "s3.root.path";
     public static final String BUCKET = "s3.bucket";
-    public static final String VIRTUAL_BUCKET = "s3.virtual.bucket";
-    public static final String VIRTUAL_KEY = "s3.virtual.key";
     public static final String VALIDITY_CHECK = "s3_validity_check";
     public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, 
ACCESS_KEY, SECRET_KEY);
     public static final List<String> TVF_REQUIRED_FIELDS = 
Arrays.asList(ACCESS_KEY, SECRET_KEY);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index 9f6339b0c2..300c51c7ad 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -68,7 +68,6 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
     private final S3URI s3uri;
     private final boolean forceVirtualHosted;
     private String virtualBucket = "";
-    private String virtualKey;
 
     public S3TableValuedFunction(Map<String, String> params) throws 
AnalysisException {
         Map<String, String> tvfParams = getValidParams(params);
@@ -94,8 +93,6 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
         locationProperties = S3Properties.credentialToMap(credential);
         String usePathStyle = 
tvfParams.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false");
         locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle);
-        locationProperties.put(S3Properties.VIRTUAL_BUCKET, virtualBucket);
-        locationProperties.put(S3Properties.VIRTUAL_KEY, getVirtualKey());
 
         parseProperties(tvfParams);
         if (FeConstants.runningUnitTest) {
@@ -125,11 +122,6 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
         return S3Properties.requiredS3TVFProperties(validParams);
     }
 
-    private String getVirtualKey() {
-        virtualKey = s3uri.getBucket() + S3URI.PATH_DELIM + s3uri.getKey();
-        return virtualKey;
-    }
-
     private String getEndpointAndSetVirtualBucket(Map<String, String> params) 
throws AnalysisException {
         Preconditions.checkState(forceVirtualHosted, "only invoked when force 
virtual hosted.");
         String[] fileds = s3uri.getVirtualBucket().split("\\.", 2);
@@ -176,7 +168,8 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
     public String getFilePath() {
         // must be "s3://..."
         if (forceVirtualHosted) {
-            return NAME + S3URI.SCHEME_DELIM + virtualBucket + 
S3URI.PATH_DELIM + virtualKey;
+            return NAME + S3URI.SCHEME_DELIM + virtualBucket + S3URI.PATH_DELIM
+                    + s3uri.getBucket() + S3URI.PATH_DELIM + s3uri.getKey();
         }
         return NAME + S3URI.SCHEME_DELIM + s3uri.getKey();
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java
index e945eff9b7..ed58d8c4b7 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java
@@ -230,7 +230,7 @@ public class PropertyConverterTest extends 
TestWithFeService {
         Assertions.assertEquals(analyzedStmt.getTableRefs().size(), 1);
         TableValuedFunctionRef oldFuncTable = (TableValuedFunctionRef) 
analyzedStmt.getTableRefs().get(0);
         S3TableValuedFunction s3Tvf = (S3TableValuedFunction) 
oldFuncTable.getTableFunction();
-        Assertions.assertEquals(s3Tvf.getBrokerDesc().getProperties().size(), 
11);
+        Assertions.assertEquals(s3Tvf.getBrokerDesc().getProperties().size(), 
9);
 
         String queryNew = "select * from s3(\n"
                     + "  'uri' = 
'http://s3.us-east-1.amazonaws.com/test.parquet',\n"
@@ -243,7 +243,7 @@ public class PropertyConverterTest extends 
TestWithFeService {
         Assertions.assertEquals(analyzedStmtNew.getTableRefs().size(), 1);
         TableValuedFunctionRef newFuncTable = (TableValuedFunctionRef) 
analyzedStmt.getTableRefs().get(0);
         S3TableValuedFunction newS3Tvf = (S3TableValuedFunction) 
newFuncTable.getTableFunction();
-        
Assertions.assertEquals(newS3Tvf.getBrokerDesc().getProperties().size(), 11);
+        
Assertions.assertEquals(newS3Tvf.getBrokerDesc().getProperties().size(), 9);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to