This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch storage in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 64077c4de49de3a7e2c3594908b4b9a869945050 Author: benjobs <[email protected]> AuthorDate: Mon Aug 21 20:24:31 2023 +0800 module improvement --- pom.xml | 9 +- .../scala/org/apache/commons/cli/CommandLine.java | 2 + streampark-storage/pom.xml | 81 ------------------ .../apache/streampark/storage/StorageService.java | 28 ------- .../streampark/storage/StorageServiceConfig.java | 20 ----- .../apache/streampark/storage/oss/OssConfig.java | 31 ------- .../streampark/storage/oss/OssStorageService.java | 95 ---------------------- .../storage/oss/OssStorageServiceTest.java | 42 ---------- 8 files changed, 9 insertions(+), 299 deletions(-) diff --git a/pom.xml b/pom.xml index 2706cabc1..5d88186c0 100644 --- a/pom.xml +++ b/pom.xml @@ -78,8 +78,6 @@ <modules> <module>streampark-common</module> <module>streampark-flink</module> - <module>streampark-spark</module> - <module>streampark-storage</module> <module>streampark-console</module> </modules> @@ -858,6 +856,13 @@ </plugins> </build> </profile> + + <profile> + <id>spark</id> + <modules> + <module>streampark-spark</module> + </modules> + </profile> </profiles> </project> diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java new file mode 100644 index 000000000..7b8c1db94 --- /dev/null +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/commons/cli/CommandLine.java @@ -0,0 +1,2 @@ +package org.apache.commons.cli;public class CommandLine { +} diff --git a/streampark-storage/pom.xml b/streampark-storage/pom.xml deleted file mode 100644 index d014c983b..000000000 --- a/streampark-storage/pom.xml +++ /dev/null @@ -1,81 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.streampark</groupId> - <artifactId>streampark</artifactId> - <version>2.2.0-SNAPSHOT</version> - </parent> - - <artifactId>streampark-storage</artifactId> - <name>StreamPark : Storage Service</name> - - <properties> - <maven.compiler.source>8</maven.compiler.source> - <maven.compiler.target>8</maven.compiler.target> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <aliyun.oss.version>3.15.0</aliyun.oss.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.projectlombok</groupId> - <artifactId>lombok</artifactId> - </dependency> - - <dependency> - <groupId>com.aliyun.oss</groupId> - <artifactId>aliyun-sdk-oss</artifactId> - <version>${aliyun.oss.version}</version> - <exclusions> - <exclusion> - <groupId>org.jdom</groupId> - <artifactId>jdom2</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <!--test--> - <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter-engine</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-deploy-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - </plugins> - </build> - -</project> diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java deleted file mode 100644 index e0ff3201c..000000000 --- a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.storage; - -/** - * StorageService will be used as artifacts fetcher in pod template, so it should rely on other - * modules. - */ -public interface StorageService { - void getData(String objectPath, String localFilePath) throws Exception; - - void putData(String objectPath, String localFilePath) throws Exception; -} diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java deleted file mode 100644 index 8e17e33d1..000000000 --- a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.storage; - -public abstract class StorageServiceConfig {} diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java deleted file mode 100644 index 986e98f73..000000000 --- a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.storage.oss; - -import org.apache.streampark.storage.StorageServiceConfig; - -import lombok.Data; - -@Data -public class OssConfig extends StorageServiceConfig { - private String accessKeyId; - private String accessKeySecret; - private String endpoint; - private String bucket; - private String baseUri; -} diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java deleted file mode 100644 index 98cffa270..000000000 --- a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.storage.oss; - -import org.apache.streampark.storage.StorageService; - -import com.aliyun.oss.ClientException; -import com.aliyun.oss.OSS; -import com.aliyun.oss.OSSClientBuilder; -import com.aliyun.oss.OSSException; -import com.aliyun.oss.model.GetObjectRequest; -import com.aliyun.oss.model.PutObjectRequest; -import com.google.common.annotations.VisibleForTesting; -import lombok.extern.slf4j.Slf4j; - -import java.io.File; - -@Slf4j -public class OssStorageService implements StorageService { - - final OssConfig ossConfig; - final OSS ossClient; - - public OssStorageService(OssConfig config) { - this.ossConfig = config; - this.ossClient = - new OSSClientBuilder() - .build( - ossConfig.getEndpoint(), - ossConfig.getAccessKeyId(), - ossConfig.getAccessKeySecret()); - } - - @Override - public void getData(String objectPath, String localFilePath) throws Exception { - String bucket = ossConfig.getBucket(); - - if (!ossClient.doesObjectExist(bucket, objectPath)) { - throw new RuntimeException(String.format("File '%s' not exist", objectPath)); - } - - try { - ossClient.getObject(new GetObjectRequest(bucket, objectPath), new File(localFilePath)); - } catch (Exception e) { - log.error("GetData failed. ObjectPath: {}, local path: {}.", objectPath, localFilePath, e); - throw handleOssException(e); - } - } - - @Override - public void putData(String objectPath, String localFilePath) throws Exception { - try { - PutObjectRequest putObjectRequest = - new PutObjectRequest(ossConfig.getBucket(), objectPath, new File(localFilePath)); - ossClient.putObject(putObjectRequest); - } catch (Exception e) { - log.error("PutData failed. ObjectPath: {}, local path: {}.", objectPath, localFilePath, e); - throw handleOssException(e); - } - } - - @VisibleForTesting - static RuntimeException handleOssException(Exception e) { - if (e instanceof OSSException) { - OSSException oe = (OSSException) e; - String errMsg = - String.format( - "Caught an OSSException. Error Message: %s." + " Error Code: %s. Request ID: %s", - oe.getErrorMessage(), oe.getErrorCode(), oe.getRequestId()); - return new RuntimeException(errMsg, oe); - } else if (e instanceof ClientException) { - ClientException ce = (ClientException) e; - String errMsg = - String.format("Caught an ClientException. Error Message: %s.", ce.getMessage()); - return new RuntimeException(errMsg, ce); - } else { - return new RuntimeException(e); - } - } -} diff --git a/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java b/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java deleted file mode 100644 index 48ab4d939..000000000 --- a/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.storage.oss; - -import com.aliyun.oss.ClientException; -import com.aliyun.oss.OSSException; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -class OssStorageServiceTest { - - @Test - void testHandleException() { - OSSException ossException = - new OSSException( - "mock error", "MOCK_CODE", "requestId", "hostId", "header", "resource", "GET"); - RuntimeException exp = OssStorageService.handleOssException(ossException); - Assertions.assertEquals( - "Caught an OSSException. Error Message: mock error. Error Code: MOCK_CODE. Request ID: requestId", - exp.getMessage()); - - ClientException ossClientException = new ClientException("Client ERROR"); - exp = OssStorageService.handleOssException(ossClientException); - Assertions.assertTrue( - exp.getMessage().startsWith("Caught an ClientException. Error Message: Client ERROR")); - } -}
