This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new de7272317f [1.2-ITS](broker)Support GCS (#20897)
de7272317f is described below
commit de7272317fefca779bbe1054a487c8c1ae0e26c8
Author: Calvin Kirs <[email protected]>
AuthorDate: Mon Aug 7 16:58:10 2023 +0800
[1.2-ITS](broker)Support GCS (#20897)
---
.../java/org/apache/doris/analysis/ExportStmt.java | 5 ++-
fs_brokers/apache_hdfs_broker/pom.xml | 19 ++++++++-
.../doris/broker/hdfs/FileSystemManager.java | 46 ++++++++++++++++++++++
3 files changed, 67 insertions(+), 3 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 132cfa3f77..2fe175ac59 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -257,9 +257,10 @@ public class ExportStmt extends StatementBase {
&& !schema.equalsIgnoreCase("oss")
&& !schema.equalsIgnoreCase("s3a")
&& !schema.equalsIgnoreCase("cosn")
- && !schema.equalsIgnoreCase("jfs"))) {
+ && !schema.equalsIgnoreCase("jfs")
+ && !schema.equalsIgnoreCase("gs"))) {
throw new AnalysisException("Invalid broker path. please use
valid 'hdfs://', 'afs://' , 'bos://',"
- + " 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://'
or 'jfs://' path.");
+ + " 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://',
'gs://' or 'jfs://' path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {
diff --git a/fs_brokers/apache_hdfs_broker/pom.xml
b/fs_brokers/apache_hdfs_broker/pom.xml
index 89ed2d372e..c0ba39aec4 100644
--- a/fs_brokers/apache_hdfs_broker/pom.xml
+++ b/fs_brokers/apache_hdfs_broker/pom.xml
@@ -70,6 +70,7 @@ under the License.
<log4j2.version>2.18.0</log4j2.version>
<project.scm.id>github</project.scm.id>
<doris.shade.version>1.0.1</doris.shade.version>
+ <gcs.version>hadoop2-2.2.15</gcs.version>
<netty.version>4.1.89.Final</netty.version>
<owasp-dependency-check-maven.version>8.1.1</owasp-dependency-check-maven.version>
@@ -305,6 +306,13 @@ under the License.
<artifactId>log4j-core</artifactId>
<version>${log4j2.version}</version>
</dependency>
+ <!-- gcs connector-->
+ <dependency>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>gcs-connector</artifactId>
+ <version>${gcs.version}</version>
+ <classifier>shaded</classifier>
+ </dependency>
<!--
https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
@@ -365,7 +373,7 @@ under the License.
<configuration>
<executable>cp</executable>
<arguments>
-
<argument>${doris.home}/gensrc/thrift/PaloBrokerService.thrift</argument>
+
<argument>${palo.home}/gensrc/thrift/PaloBrokerService.thrift</argument>
<argument>${basedir}/src/main/resources/thrift/</argument>
</arguments>
<skip>${skip.plugin}</skip>
@@ -395,6 +403,15 @@ under the License.
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**/gcs-connector-${gcs.version}.jar</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
<!-- copy all dependency libs to target lib dir -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index f6e27bd0ef..6cc63efde2 100644
---
a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++
b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -77,6 +77,12 @@ public class FileSystemManager {
private static final String AFS_SCHEME = "afs";
private static final String GFS_SCHEME = "gfs";
+ private static final String GCS_SCHEME = "gs";
+
+ private static final String FS_PREFIX = "fs.";
+
+ private static final String GCS_PROJECT_ID_KEY = "fs.gs.project.id";
+
private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
private static final String AUTHENTICATION_SIMPLE = "simple";
@@ -224,6 +230,8 @@ public class FileSystemManager {
brokerFileSystem = getJuiceFileSystem(path, properties);
} else if (scheme.equals(GFS_SCHEME)) {
brokerFileSystem = getGooseFSFileSystem(path, properties);
+ } else if (scheme.equals(GCS_SCHEME)) {
+ brokerFileSystem = getGCSFileSystem(path, properties);
} else {
throw new
BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
@@ -475,6 +483,44 @@ public class FileSystemManager {
}
}
+ /**
+ * get GCS file system
+ * @param path gcs path
+ * @param properties See {@link
https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/branch-2.2.x/gcs/CONFIGURATION.md)}
+ * @return GcsBrokerFileSystem
+ */
+ public BrokerFileSystem getGCSFileSystem(String path, Map<String, String>
properties) {
+ WildcardURI pathUri = new WildcardURI(path);
+ String projectId = properties.get(GCS_PROJECT_ID_KEY);
+ if (Strings.isNullOrEmpty(projectId)) {
+ throw new
BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+ "missed fs.gs.project.id configuration");
+ }
+ String ugi = "";
+ FileSystemIdentity fileSystemIdentity = new
FileSystemIdentity(projectId, ugi);
+ BrokerFileSystem fileSystem =
updateCachedFileSystem(fileSystemIdentity, properties);
+ fileSystem.getLock().lock();
+ try {
+ if (fileSystem.getDFSFileSystem() == null) {
+ logger.info("create file system for new path " + path);
+ // create a new filesystem
+ Configuration conf = new Configuration();
+ properties.forEach((k, v) -> {
+ if (Strings.isNullOrEmpty(v) && k.startsWith("fs.")) {
+ conf.set(k, v);
+ }
+ });
+ FileSystem gcsFileSystem = FileSystem.get(pathUri.getUri(),
conf);
+ fileSystem.setFileSystem(gcsFileSystem);
+ }
+ return fileSystem;
+ } catch (Exception e) {
+ logger.error("errors while connect to " + path, e);
+ throw new
BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+ } finally {
+ fileSystem.getLock().unlock();
+ }
+ }
/**
* file system handle is cached, the identity is endpoint + bucket +
accessKey_secretKey
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]