This is an automated email from the ASF dual-hosted git repository.
ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new db69457576 [fix](avro)Fix S3 TVF avro format reading failure (#22199)
db69457576 is described below
commit db69457576f9cef089dab56c5dd93b76a631dd7a
Author: DongLiang-0 <[email protected]>
AuthorDate: Fri Aug 11 17:22:48 2023 +0800
[fix](avro)Fix S3 TVF avro format reading failure (#22199)
This pr fixes two issues:
1. when using s3 TVF to query files in AVRO format, due to the change of
`TFileType`, the originally queried `FILE_S3 ` becomes `FILE_LOCAL`, causing
the query failed.
2. currently, both parameters `s3.virtual.key` and `s3.virtual.bucket` are
removed. A new `S3Utils` in jni-avro to parse the bucket and key of s3.
The purpose of doing this operation is mainly to unify the parameters of s3.
---
be/src/vec/exec/format/avro/avro_jni_reader.cpp | 31 ++++--
be/src/vec/exec/format/avro/avro_jni_reader.h | 4 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 4 +-
.../java/org/apache/doris/avro/AvroJNIScanner.java | 4 +-
.../java/org/apache/doris/avro/AvroProperties.java | 2 -
.../java/org/apache/doris/avro/HDFSFileReader.java | 9 +-
.../java/org/apache/doris/avro/S3FileReader.java | 20 ++--
.../main/java/org/apache/doris/avro/S3Utils.java | 109 +++++++++++++++++++++
.../property/constants/S3Properties.java | 2 -
.../doris/tablefunction/S3TableValuedFunction.java | 11 +--
.../datasource/property/PropertyConverterTest.java | 4 +-
11 files changed, 160 insertions(+), 40 deletions(-)
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp
b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
index ffca8682c4..5a347a0e59 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
@@ -27,8 +27,13 @@ namespace doris::vectorized {
AvroJNIReader::AvroJNIReader(RuntimeState* state, RuntimeProfile* profile,
const TFileScanRangeParams& params,
- const std::vector<SlotDescriptor*>&
file_slot_descs)
- : _file_slot_descs(file_slot_descs), _state(state), _profile(profile),
_params(params) {}
+ const std::vector<SlotDescriptor*>&
file_slot_descs,
+ const TFileRangeDesc& range)
+ : _file_slot_descs(file_slot_descs),
+ _state(state),
+ _profile(profile),
+ _params(params),
+ _range(range) {}
AvroJNIReader::AvroJNIReader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
const TFileRangeDesc& range,
@@ -74,13 +79,7 @@ Status AvroJNIReader::init_fetch_table_reader(
index++;
}
- TFileType::type type;
- if (_range.__isset.file_type) {
- // for compatibility
- type = _range.file_type;
- } else {
- type = _params.file_type;
- }
+ TFileType::type type = get_file_type();
std::map<String, String> required_param = {
{"required_fields", required_fields.str()},
{"columns_types", columns_types.str()},
@@ -92,6 +91,7 @@ Status AvroJNIReader::init_fetch_table_reader(
required_param.insert(std::make_pair("uri",
_params.hdfs_params.hdfs_conf.data()->value));
break;
case TFileType::FILE_S3:
+ required_param.insert(std::make_pair("uri", _range.path));
required_param.insert(_params.properties.begin(),
_params.properties.end());
break;
default:
@@ -104,9 +104,20 @@ Status AvroJNIReader::init_fetch_table_reader(
return _jni_connector->open(_state, _profile);
}
+TFileType::type AvroJNIReader::get_file_type() {
+ TFileType::type type;
+ if (_range.__isset.file_type) {
+ // for compatibility
+ type = _range.file_type;
+ } else {
+ type = _params.file_type;
+ }
+ return type;
+}
+
Status AvroJNIReader::init_fetch_table_schema_reader() {
std::map<String, String> required_param = {{"uri", _range.path},
- {"file_type",
std::to_string(_params.file_type)},
+ {"file_type",
std::to_string(get_file_type())},
{"is_get_table_schema",
"true"}};
required_param.insert(_params.properties.begin(),
_params.properties.end());
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.h
b/be/src/vec/exec/format/avro/avro_jni_reader.h
index 70ef859fba..8f8d9f6683 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.h
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.h
@@ -56,7 +56,7 @@ public:
* Call java side by jni to get table data.
*/
AvroJNIReader(RuntimeState* state, RuntimeProfile* profile, const
TFileScanRangeParams& params,
- const std::vector<SlotDescriptor*>& file_slot_descs);
+ const std::vector<SlotDescriptor*>& file_slot_descs, const
TFileRangeDesc& range);
/**
* Call java side by jni to get table schema.
@@ -74,6 +74,8 @@ public:
Status init_fetch_table_reader(
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+ TFileType::type get_file_type();
+
Status init_fetch_table_schema_reader();
Status get_parsed_schema(std::vector<std::string>* col_names,
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 513c384930..6365f60ed6 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -775,8 +775,8 @@ Status VFileScanner::_get_next_reader() {
break;
}
case TFileFormatType::FORMAT_AVRO: {
- _cur_reader =
- AvroJNIReader::create_unique(_state, _profile, *_params,
_file_slot_descs);
+ _cur_reader = AvroJNIReader::create_unique(_state, _profile,
*_params, _file_slot_descs,
+ range);
init_status = ((AvroJNIReader*)(_cur_reader.get()))
->init_fetch_table_reader(_colname_to_value_range);
break;
diff --git
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
index d3e1cad579..11bce610d7 100644
---
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
+++
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
@@ -146,13 +146,11 @@ public class AvroJNIScanner extends JniScanner {
this.avroReader = new HDFSFileReader(uri);
break;
case FILE_S3:
- String bucketName =
requiredParams.get(AvroProperties.S3_BUCKET);
- String key = requiredParams.get(AvroProperties.S3_KEY);
String accessKey =
requiredParams.get(AvroProperties.S3_ACCESS_KEY);
String secretKey =
requiredParams.get(AvroProperties.S3_SECRET_KEY);
String endpoint =
requiredParams.get(AvroProperties.S3_ENDPOINT);
String region = requiredParams.get(AvroProperties.S3_REGION);
- this.avroReader = new S3FileReader(accessKey, secretKey,
endpoint, region, bucketName, key);
+ this.avroReader = new S3FileReader(accessKey, secretKey,
endpoint, region, uri);
break;
default:
LOG.warn("Unsupported " + fileType.name() + " file type.");
diff --git
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java
index 11066b5e08..6619b6888c 100644
---
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java
+++
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroProperties.java
@@ -27,8 +27,6 @@ public class AvroProperties {
protected static final String REQUIRED_FIELDS = "required_fields";
protected static final String FILE_TYPE = "file_type";
protected static final String URI = "uri";
- protected static final String S3_BUCKET = "s3.virtual.bucket";
- protected static final String S3_KEY = "s3.virtual.key";
protected static final String S3_ACCESS_KEY = "s3.access_key";
protected static final String S3_SECRET_KEY = "s3.secret_key";
protected static final String S3_ENDPOINT = "s3.endpoint";
diff --git
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java
index 7200b4dde6..8c18970402 100644
---
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java
+++
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/HDFSFileReader.java
@@ -30,6 +30,7 @@ import org.apache.log4j.Logger;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.URI;
+import java.util.Objects;
public class HDFSFileReader implements AvroReader {
private static final Logger LOG =
LogManager.getLogger(HDFSFileReader.class);
@@ -67,7 +68,11 @@ public class HDFSFileReader implements AvroReader {
@Override
public void close() throws IOException {
- inputStream.close();
- reader.close();
+ if (Objects.nonNull(inputStream)) {
+ inputStream.close();
+ }
+ if (Objects.nonNull(reader)) {
+ reader.close();
+ }
}
}
diff --git
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java
index bd966eb31c..4b1b4a864c 100644
---
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java
+++
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3FileReader.java
@@ -35,6 +35,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Objects;
public class S3FileReader implements AvroReader {
@@ -48,13 +49,14 @@ public class S3FileReader implements AvroReader {
private final String endpoint;
private final String region;
- public S3FileReader(String accessKey, String secretKey, String endpoint,
String region,
- String bucketName, String key) {
- this.bucketName = bucketName;
- this.key = key;
+ public S3FileReader(String accessKey, String secretKey, String endpoint,
String region, String uri)
+ throws IOException {
this.endpoint = endpoint;
this.region = region;
- credentials = new BasicAWSCredentials(accessKey, secretKey);
+ this.credentials = new BasicAWSCredentials(accessKey, secretKey);
+ S3Utils.parseURI(uri);
+ this.bucketName = S3Utils.getBucket();
+ this.key = S3Utils.getKey();
}
@Override
@@ -85,7 +87,11 @@ public class S3FileReader implements AvroReader {
@Override
public void close() throws IOException {
- s3ObjectInputStream.close();
- reader.close();
+ if (Objects.nonNull(s3ObjectInputStream)) {
+ s3ObjectInputStream.close();
+ }
+ if (Objects.nonNull(reader)) {
+ reader.close();
+ }
}
}
diff --git
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3Utils.java
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3Utils.java
new file mode 100644
index 0000000000..85ac4893cf
--- /dev/null
+++
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/S3Utils.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.avro;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+
+public class S3Utils {
+ private static final String SCHEMA_S3 = "s3";
+ private static final String SCHEMA_HTTP = "http";
+ private static final String SCHEMA_HTTPS = "https";
+ private static final String SCHEME_DELIM = "://";
+ private static final String PATH_DELIM = "/";
+ private static final String QUERY_DELIM = "\\?";
+ private static final String FRAGMENT_DELIM = "#";
+ private static String bucket;
+ private static String key;
+
+ /**
+ * eg:
+ * s3: s3://bucket1/path/to/file.txt
+ * http: http://10.10.10.1:9000/bucket1/to/file.txt
+ * https: https://10.10.10.1:9000/bucket1/to/file.txt
+ * <p>
+ * schema: s3,http,https
+ * bucket: bucket1
+ * key: path/to/file.txt
+ */
+ public static void parseURI(String uri) throws IOException {
+ if (StringUtils.isEmpty(uri)) {
+ throw new IOException("s3 uri is empty.");
+ }
+ String[] schemeSplit = uri.split(SCHEME_DELIM);
+ String rest;
+ if (schemeSplit.length == 2) {
+ if (schemeSplit[0].equalsIgnoreCase(SCHEMA_S3)) {
+ // has scheme, eg: s3://bucket1/path/to/file.txt
+ rest = schemeSplit[1];
+ String[] authoritySplit = rest.split(PATH_DELIM, 2);
+ if (authoritySplit.length < 1) {
+ throw new IOException("Invalid S3 URI. uri=" + uri);
+ }
+ bucket = authoritySplit[0];
+ // support s3://bucket1
+ key = authoritySplit.length == 1 ? "/" : authoritySplit[1];
+ } else if (schemeSplit[0].equalsIgnoreCase(SCHEMA_HTTP) ||
schemeSplit[0].equalsIgnoreCase(SCHEMA_HTTPS)) {
+ // has scheme, eg: http(s)://host/bucket1/path/to/file.txt
+ rest = schemeSplit[1];
+ String[] authoritySplit = rest.split(PATH_DELIM, 3);
+ if (authoritySplit.length != 3) {
+ throw new IOException("Invalid S3 HTTP URI: uri=" + uri);
+ }
+ // authority_split[1] is host
+ bucket = authoritySplit[1];
+ key = authoritySplit[2];
+ } else {
+ throw new IOException("Invalid S3 HTTP URI: uri=" + uri);
+ }
+
+ } else if (schemeSplit.length == 1) {
+ // no scheme, eg: path/to/file.txt
+ bucket = ""; // unknown
+ key = uri;
+ } else {
+ throw new IOException("Invalid S3 URI. uri=" + uri);
+ }
+
+ key = key.trim();
+ if (StringUtils.isEmpty(key)) {
+ throw new IOException("Invalid S3 URI. uri=" + uri);
+ }
+ // Strip query and fragment if they exist
+ String[] querySplit = key.split(QUERY_DELIM);
+ String[] fragmentSplit = querySplit[0].split(FRAGMENT_DELIM);
+ key = fragmentSplit[0];
+ }
+
+ public static String getBucket() {
+ return bucket;
+ }
+
+ public static String getKey() {
+ return key;
+ }
+
+ public static void main(String[] args) throws IOException {
+ S3Utils.parseURI("https://10.10.10.1:9000/bucket1/path/person.avro");
+ String bucket1 = S3Utils.getBucket();
+ String key1 = S3Utils.getKey();
+ System.out.println(bucket1 + " " + key1);
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
index 3d3a003466..a1da49257b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java
@@ -55,8 +55,6 @@ public class S3Properties extends BaseProperties {
// required by storage policy
public static final String ROOT_PATH = "s3.root.path";
public static final String BUCKET = "s3.bucket";
- public static final String VIRTUAL_BUCKET = "s3.virtual.bucket";
- public static final String VIRTUAL_KEY = "s3.virtual.key";
public static final String VALIDITY_CHECK = "s3_validity_check";
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT,
ACCESS_KEY, SECRET_KEY);
public static final List<String> TVF_REQUIRED_FIELDS =
Arrays.asList(ACCESS_KEY, SECRET_KEY);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index 9f6339b0c2..300c51c7ad 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -68,7 +68,6 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
private final S3URI s3uri;
private final boolean forceVirtualHosted;
private String virtualBucket = "";
- private String virtualKey;
public S3TableValuedFunction(Map<String, String> params) throws
AnalysisException {
Map<String, String> tvfParams = getValidParams(params);
@@ -94,8 +93,6 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
locationProperties = S3Properties.credentialToMap(credential);
String usePathStyle =
tvfParams.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false");
locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle);
- locationProperties.put(S3Properties.VIRTUAL_BUCKET, virtualBucket);
- locationProperties.put(S3Properties.VIRTUAL_KEY, getVirtualKey());
parseProperties(tvfParams);
if (FeConstants.runningUnitTest) {
@@ -125,11 +122,6 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
return S3Properties.requiredS3TVFProperties(validParams);
}
- private String getVirtualKey() {
- virtualKey = s3uri.getBucket() + S3URI.PATH_DELIM + s3uri.getKey();
- return virtualKey;
- }
-
private String getEndpointAndSetVirtualBucket(Map<String, String> params)
throws AnalysisException {
Preconditions.checkState(forceVirtualHosted, "only invoked when force
virtual hosted.");
String[] fileds = s3uri.getVirtualBucket().split("\\.", 2);
@@ -176,7 +168,8 @@ public class S3TableValuedFunction extends
ExternalFileTableValuedFunction {
public String getFilePath() {
// must be "s3://..."
if (forceVirtualHosted) {
- return NAME + S3URI.SCHEME_DELIM + virtualBucket +
S3URI.PATH_DELIM + virtualKey;
+ return NAME + S3URI.SCHEME_DELIM + virtualBucket + S3URI.PATH_DELIM
+ + s3uri.getBucket() + S3URI.PATH_DELIM + s3uri.getKey();
}
return NAME + S3URI.SCHEME_DELIM + s3uri.getKey();
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java
index e945eff9b7..ed58d8c4b7 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/PropertyConverterTest.java
@@ -230,7 +230,7 @@ public class PropertyConverterTest extends
TestWithFeService {
Assertions.assertEquals(analyzedStmt.getTableRefs().size(), 1);
TableValuedFunctionRef oldFuncTable = (TableValuedFunctionRef)
analyzedStmt.getTableRefs().get(0);
S3TableValuedFunction s3Tvf = (S3TableValuedFunction)
oldFuncTable.getTableFunction();
- Assertions.assertEquals(s3Tvf.getBrokerDesc().getProperties().size(),
11);
+ Assertions.assertEquals(s3Tvf.getBrokerDesc().getProperties().size(),
9);
String queryNew = "select * from s3(\n"
+ " 'uri' =
'http://s3.us-east-1.amazonaws.com/test.parquet',\n"
@@ -243,7 +243,7 @@ public class PropertyConverterTest extends
TestWithFeService {
Assertions.assertEquals(analyzedStmtNew.getTableRefs().size(), 1);
TableValuedFunctionRef newFuncTable = (TableValuedFunctionRef)
analyzedStmt.getTableRefs().get(0);
S3TableValuedFunction newS3Tvf = (S3TableValuedFunction)
newFuncTable.getTableFunction();
-
Assertions.assertEquals(newS3Tvf.getBrokerDesc().getProperties().size(), 11);
+
Assertions.assertEquals(newS3Tvf.getBrokerDesc().getProperties().size(), 9);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]