This is an automated email from the ASF dual-hosted git repository. bertty pushed a commit to branch s3 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit f232a5ed35da32a549baaa6ea7db962b2ea1f483 Author: Bertty Contreras-Rojas <[email protected]> AuthorDate: Tue Jul 5 12:58:38 2022 +0200 [WAYANG-S3] support for s3 in apache wayang Signed-off-by: bertty <[email protected]> --- pom.xml | 2 +- wayang-benchmark/pom.xml | 5 + wayang-commons/pom.xml | 8 + wayang-commons/wayang-core/pom.xml | 22 +- .../apache/wayang/core/util/fs/FileSystems.java | 3 +- .../apache/wayang/core/util/fs/S3FileSystem.java | 232 +++++++++++++++++++++ .../wayang/spark/platform/SparkPlatform.java | 17 +- wayang-platforms/wayang-spark/pom.xml | 12 +- 8 files changed, 296 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 36ab3ede..b506f3b7 100644 --- a/pom.xml +++ b/pom.xml @@ -114,7 +114,7 @@ <mockito.version>3.5.10</mockito.version> <mockk.version>1.10.0</mockk.version> <external.platforms.scope>provided</external.platforms.scope> - <hadoop.version>2.7.7</hadoop.version> + <hadoop.version>3.1.2</hadoop.version> <!-- To be overridden by individual modules --> <java-module-name>org.apache.wayang.default</java-module-name> <code.coverage.project.folder>${basedir}/</code.coverage.project.folder> diff --git a/wayang-benchmark/pom.xml b/wayang-benchmark/pom.xml index f230ee09..7df47adf 100644 --- a/wayang-benchmark/pom.xml +++ b/wayang-benchmark/pom.xml @@ -59,6 +59,11 @@ <artifactId>wayang-sqlite3</artifactId> <version>0.6.1-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <version>3.1.2</version> + </dependency> </dependencies> <modules> diff --git a/wayang-commons/pom.xml b/wayang-commons/pom.xml index fdb0e20f..29941082 100644 --- a/wayang-commons/pom.xml +++ b/wayang-commons/pom.xml @@ -81,6 +81,10 @@ <groupId>com.thoughtworks.paranamer</groupId> <artifactId>paranamer</artifactId> </exclusion> + <exclusion> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -165,6 +169,10 @@ <groupId>com.thoughtworks.paranamer</groupId> <artifactId>paranamer</artifactId> </exclusion> + <exclusion> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + </exclusion> </exclusions> </dependency> <dependency> diff --git a/wayang-commons/wayang-core/pom.xml b/wayang-commons/wayang-core/pom.xml index 780c04ad..b170a49d 100644 --- a/wayang-commons/wayang-core/pom.xml +++ b/wayang-commons/wayang-core/pom.xml @@ -37,7 +37,17 @@ <properties> <java-module-name>org.apache.wayang.core</java-module-name> </properties> - + <dependencyManagement> + <dependencies> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-bom</artifactId> + <version>1.12.253</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> <dependencies> <dependency> <groupId>org.yaml</groupId> @@ -93,6 +103,16 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>1.12.253</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.13</version> + </dependency> </dependencies> </project> diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java index 21ae249e..087a64d2 100644 --- a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/FileSystems.java @@ -45,7 +45,8 @@ public class FileSystems { private static Collection<FileSystem> registeredFileSystems = Arrays.asList( new LocalFileSystem(), - new HadoopFileSystem() + new HadoopFileSystem(), + new S3FileSystem() ); private FileSystems() { diff --git a/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java new file mode 100644 index 00000000..99fe3cb8 --- /dev/null +++ b/wayang-commons/wayang-core/src/main/java/org/apache/wayang/core/util/fs/S3FileSystem.java @@ -0,0 +1,232 @@ +package org.apache.wayang.core.util.fs; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectResult; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.wayang.core.api.exception.WayangException; + +public class S3FileSystem implements FileSystem { + + final AmazonS3 s3; + + final Map<String, S3Pair> pairs = new HashMap<>(); + + public S3FileSystem(){ + + if( + System.getProperties().contains("fs.s3.awsAccessKeyId") && + System.getProperties().contains("fs.s3.awsSecretAccessKey") + ){ + BasicAWSCredentials awsCreds = new BasicAWSCredentials( + System.getProperty("fs.s3.awsAccessKeyId"), + System.getProperty("fs.s3.awsSecretAccessKey") + ); + this.s3 = AmazonS3ClientBuilder.standard() + .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) + .build(); + }else{ + this.s3 = AmazonS3ClientBuilder.defaultClient(); + } + } + + public static void main(String... args) throws IOException { + S3FileSystem s3 = new S3FileSystem(); + //String url = "s3://blossom-benchmark/HIGGS.csv"; + // String url = "s3://blossom-benchmark/README.md"; + String url = "s3://blossom-benchmark/lulu/lolo/lala"; + System.out.println(url); + System.out.println(s3.getS3Pair(url).getBucket()); + System.out.println(s3.getS3Pair(url).getKey()); + System.out.println(s3.preFoldersExits(s3.getS3Pair(url))); + + // System.out.println(s3.getFileSize(url)); +// InputStream content = s3.open(url); +// new BufferedReader(new InputStreamReader(content)).lines().forEach(System.out::println); +// System.out.println(s3.listChildren(url)); +// System.out.println(s3.isDirectory(url)); + OutputStream output = s3.create(url, true); + byte[] bytes = "lala".getBytes(); + output.write(bytes); + output.flush(); + output.close(); + } + + class S3Pair{ + + private final String bucket; + private final String key; + + public S3Pair(S3FileSystem s3Client, String url){ + if( ! s3Client.canHandle(url)){ + throw new WayangException("The files can not be handle by "+this.getClass().getSimpleName()); + } + String[] parts = url.split("/", 4); + String key_tmp = ""; + if(parts.length == 4) { + key_tmp = parts[3]; + } + this.bucket = parts[2]; + this.key = key_tmp; + } + + public S3Pair(String bucket, String key){ + this.bucket = bucket; + this.key = key; + } + + public String getBucket() { + return bucket; + } + + public String getKey() { + return key; + } + } + + private S3Pair getS3Pair(String url){ + S3Pair pair = this.pairs.get(url); + if(pair == null){ + pair = new S3Pair(this, url); + this.pairs.put(url, pair); + } + return pair; + } + + @Override + public long getFileSize(String fileUrl) throws FileNotFoundException { + return this.getFileSize(this.getS3Pair(fileUrl)); + } + + private long getFileSize(S3Pair pair) throws FileNotFoundException { + return this.s3.getObjectMetadata(pair.getBucket(), pair.getKey()).getContentLength(); + } + + @Override + public boolean canHandle(String url) { + String url_lower = url.substring(0, 5).toLowerCase(); + return url_lower.startsWith("s3a:/"); + } + + @Override + public InputStream open(String url) throws IOException { + return this.open(this.getS3Pair(url)); + } + + private InputStream open(S3Pair pair) throws IOException { + return this.s3.getObject(pair.getBucket(), pair.getKey()).getObjectContent(); + } + + @Override + public OutputStream create(String url) throws IOException { + return this.create(this.getS3Pair(url)); + } + + private OutputStream create(S3Pair pair) throws IOException { + return this.create(pair, false); + } + + @Override + public OutputStream create(String url, Boolean forceCreateParentDirs) throws IOException { + return this.create(this.getS3Pair(url), forceCreateParentDirs); + } + + private OutputStream create(S3Pair pair, Boolean forceCreateParentDirs) throws IOException { + if( ! forceCreateParentDirs ){ + if ( ! this.preFoldersExits(pair) ) + throw new IOException( + String.format( + "The folder '%s' does not exist in the bucket '%s'", + pair.getKey(), + pair.getBucket() + ) + ); + } + + PipedInputStream in = new PipedInputStream(); + final PipedOutputStream out = new PipedOutputStream(in); + + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentType("text/plain"); + AmazonS3 s3Client = this.s3; + new Thread(new Runnable() { + public void run() { + PutObjectResult result = s3Client.putObject(pair.getBucket(), pair.getKey(), in, metadata); + } + }).start(); + return out; + } + + public boolean bucketExits(S3Pair pair){ + return this.s3.doesBucketExistV2(pair.getBucket()); + } + + public boolean preFoldersExits(S3Pair pair){ + if( ! this.s3.doesBucketExistV2(pair.getBucket()) ) return false; + String[] keys = pair.getKey().split("/"); + String aggregated = ""; + for(int i = 0; i < keys.length; i++){ + aggregated = aggregated + "/" +keys[i]; + if( ! isDirectory(new S3Pair(pair.getBucket(), aggregated)) ){ + return false; + } + } + return true; + } + + @Override + public boolean isDirectory(String url) { + return this.isDirectory(this.getS3Pair(url)); + } + + private boolean isDirectory(S3Pair pair) { + if( ! this.bucketExits(pair) ) return false; + + String key = pair.getKey(); + long size = listChildren(pair).stream().filter(name -> ! name.equals(key)).count(); + if(size > 0){ + return true; + } + return false; + } + + @Override + public Collection<String> listChildren(String url) { + return this.listChildren(this.getS3Pair(url)); + } + + private Collection<String> listChildren(S3Pair pair) { + ObjectListing listing = this.s3.listObjects(pair.getBucket(), pair.getKey()); + return listing.getObjectSummaries().stream() + .map(obj -> obj.getKey()) + .collect(Collectors.toList()); + } + + @Override + public boolean delete(String url, boolean isRecursiveDelete) throws IOException { + return this.delete(this.getS3Pair(url), isRecursiveDelete); + } + + private boolean delete(S3Pair pair, boolean isRecursiveDelete) throws IOException { + if(!isRecursiveDelete){ + if(isDirectory(pair)){ + throw new IOException("the path correspond to a directory"); + } + } + this.s3.deleteObject(pair.getBucket(), pair.getKey()); + return true; + } +} diff --git a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java index 9b574afc..471275ab 100644 --- a/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java +++ b/wayang-platforms/wayang-spark/code/main/java/org/apache/wayang/spark/platform/SparkPlatform.java @@ -83,7 +83,12 @@ public class SparkPlatform extends Platform { "spark.io.compression.codec", "spark.driver.memory", "spark.executor.heartbeatInterval", - "spark.network.timeout" + "spark.network.timeout", + }; + + private static final String[] OPTIONAL_HADOOP_PROPERTIES = { + "fs.s3.awsAccessKeyId", + "fs.s3.awsSecretAccessKey" }; /** @@ -121,6 +126,7 @@ public class SparkPlatform extends Platform { "There is already a SparkContext (master: {}): , which will be reused. " + "Not all settings might be effective.", sparkContext.getConf().get("spark.master")); sparkConf = sparkContext.getConf(); + } else { sparkConf = new SparkConf(true); } @@ -133,6 +139,7 @@ public class SparkPlatform extends Platform { value -> sparkConf.set(property, value) ); } + if (job.getName() != null) { sparkConf.set("spark.app.name", job.getName()); } @@ -142,6 +149,14 @@ public class SparkPlatform extends Platform { } final JavaSparkContext sparkContext = this.sparkContextReference.get(); + org.apache.hadoop.conf.Configuration hadoopconf = sparkContext.hadoopConfiguration(); + for (String property: OPTIONAL_HADOOP_PROPERTIES){ + System.out.println(property); + configuration.getOptionalStringProperty(property).ifPresent( + value -> hadoopconf.set(property, value) + ); + } + // Set up the JAR files. //sparkContext.clearJars(); if (!sparkContext.isLocal()) { diff --git a/wayang-platforms/wayang-spark/pom.xml b/wayang-platforms/wayang-spark/pom.xml index 32c3625f..2fcfbfc2 100644 --- a/wayang-platforms/wayang-spark/pom.xml +++ b/wayang-platforms/wayang-spark/pom.xml @@ -71,7 +71,17 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> - <version>2.7.7</version> + <version>3.1.2</version> + </dependency> + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>1.12.253</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + <version>3.1.2</version> </dependency> </dependencies>
