Repository: apex-malhar Updated Branches: refs/heads/master 7f1abca75 -> d5c24dc8e
APEXMALHAR-2416 Implementation of Redshift output module Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d5c24dc8 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d5c24dc8 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d5c24dc8 Branch: refs/heads/master Commit: d5c24dc8e283688f76fd41b3a57f9a2c1464593b Parents: 7f1abca Author: chaitanya <[email protected]> Authored: Wed Mar 15 16:13:39 2017 +0530 Committer: chaitanya <[email protected]> Committed: Wed Mar 15 16:13:39 2017 +0530 ---------------------------------------------------------------------- ...stractJdbcTransactionableOutputOperator.java | 8 +- ...dshiftJdbcTransactionableOutputOperator.java | 296 ++++++++++++++++ .../lib/db/redshift/RedshiftOutputModule.java | 345 +++++++++++++++++++ .../apex/malhar/lib/fs/s3/S3Reconciler.java | 97 +++--- .../malhar/lib/fs/s3/S3TupleOutputModule.java | 32 +- .../RedshiftJdbcTransactionalOperatorTest.java | 178 ++++++++++ .../apex/malhar/lib/fs/s3/S3ReconcilerTest.java | 5 +- 7 files changed, 899 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java index 0a7f3fd..df22e6f 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcTransactionableOutputOperator.java @@ -68,9 +68,9 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> @Min(1) private int batchSize; - private final List<T> tuples; + protected final List<T> tuples; - private transient int batchStartIdx; + protected transient int batchStartIdx; private transient PreparedStatement updateCommand; @OutputPortFieldAnnotation(optional = true) @@ -139,7 +139,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> } } - private void processBatch() + protected void processBatch() { logger.debug("start {} end {}", batchStartIdx, tuples.size()); try { @@ -165,7 +165,7 @@ public abstract class AbstractJdbcTransactionableOutputOperator<T> * @param updateCounts * @param commandsInBatch */ - private void processUpdateCounts(int[] updateCounts, int commandsInBatch) + protected void processUpdateCounts(int[] updateCounts, int commandsInBatch) { if (updateCounts.length < commandsInBatch) { // Driver chose not to continue processing after failure. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionableOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionableOutputOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionableOutputOperator.java new file mode 100644 index 0000000..286e852 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionableOutputOperator.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.db.redshift; + +import java.sql.BatchUpdateException; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; +import com.sun.tools.javac.util.Assert; + +import com.datatorrent.api.Context; +import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator; + +/** + * A concrete implementation of AbstractJdbcTransactionableOutputOperator for Redshift which takes FSRecordCompactionOperator.OutputMetaData. + * Load the data into the specified redshift tables from data files. The files can be located either in S3 or an Amazon EMR. + * Specify the bucketName property if the file is located in S3 or specify the emrClusterId if the file is location in EMR. + * By default, it load files from S3 into Redshfit table. If the file is located in EMR, then specify "readFromS3" parameter to false. + * + * @displayName Redshift Output Operator + * @category Output + * @tags database, jdbc, redshift + */ [email protected] +@OperatorAnnotation(partitionable = false) +public class RedshiftJdbcTransactionableOutputOperator extends AbstractJdbcTransactionableOutputOperator<FSRecordCompactionOperator.OutputMetaData> +{ + private static final Logger logger = LoggerFactory.getLogger(RedshiftJdbcTransactionableOutputOperator.class); + protected static final String DEFAULT_REDSHIFT_DELIMITER = "|"; + @NotNull + private String tableName; + @NotNull + private String accessKey; + @NotNull + private String secretKey; + @NotNull + private String redshiftDelimiter = DEFAULT_REDSHIFT_DELIMITER; + private String region; + @NotNull + private RedshiftOutputModule.READER_MODE readerMode; + private String emrClusterId; + private String bucketName; + protected transient Statement stmt; + + @Override + public void setup(Context.OperatorContext context) + { + if (readerMode == RedshiftOutputModule.READER_MODE.READ_FROM_S3) { + Assert.checkNonNull(bucketName); + } else { + Assert.checkNonNull(emrClusterId); + } + super.setup(context); + try { + stmt = store.getConnection().createStatement(); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + protected String getUpdateCommand() + { + throw new UnsupportedOperationException("Unsupported Operation"); + } + + // Preparedstatement is not needed for uploading data into redshift. So, nothing to be done in activate state. + @Override + public void activate(Context.OperatorContext context) + { + } + + /** + * Create the copy statement from the specified OutputMetaData + * @param data Given OutputMetaData + * @return the copy statement + */ + + protected String generateCopyStatement(FSRecordCompactionOperator.OutputMetaData data) + { + String file = data.getPath(); + StringBuilder exec = new StringBuilder(); + exec.append("COPY " + tableName + " "); + if (readerMode == RedshiftOutputModule.READER_MODE.READ_FROM_S3) { + exec.append("FROM 's3://" + bucketName + "/" + file + "' "); + } else { + exec.append("FROM 'emr://" + emrClusterId + "/" + file + "' "); + } + exec.append("CREDENTIALS 'aws_access_key_id=" + accessKey); + exec.append(";aws_secret_access_key=" + secretKey + "' "); + if (region != null) { + exec.append("region '" + region + "' "); + } + exec.append("DELIMITER '" + redshiftDelimiter + "'"); + exec.append(";"); + return exec.toString(); + } + + @Override + protected void processBatch() + { + logger.debug("start {} end {}", batchStartIdx, tuples.size()); + try { + for (int i = batchStartIdx; i < tuples.size(); i++) { + String copyStmt = generateCopyStatement(tuples.get(i)); + stmt.addBatch(copyStmt); + } + stmt.executeBatch(); + stmt.clearBatch(); + batchStartIdx += tuples.size() - batchStartIdx; + } catch (BatchUpdateException bue) { + logger.error(bue.getMessage()); + processUpdateCounts(bue.getUpdateCounts(), tuples.size() - batchStartIdx); + } catch (SQLException e) { + throw new RuntimeException("processing batch", e); + } + } + + @Override + protected void setStatementParameters(PreparedStatement statement, FSRecordCompactionOperator.OutputMetaData tuple) throws SQLException + { + throw new UnsupportedOperationException("Unsupported Operation"); + } + + /** + * Get the table name from database + * @return tableName + */ + public String getTableName() + { + return tableName; + } + + /** + * Set the name of the table as it stored in redshift + * @param tableName table name + */ + public void setTableName(@NotNull String tableName) + { + this.tableName = Preconditions.checkNotNull(tableName); + } + + /** + * Get the AWS Access key + * @return accessKey + */ + public String getAccessKey() + { + return accessKey; + } + + /** + * Set the AWS Access Key + * @param accessKey given accessKey + */ + public void setAccessKey(@NotNull String accessKey) + { + this.accessKey = Preconditions.checkNotNull(accessKey); + } + + /** + * Get the AWS secret key + * @return secretKey + */ + public String getSecretKey() + { + return secretKey; + } + + /** + * Set the AWS secret key + * @param secretKey secretkey + */ + public void setSecretKey(@NotNull String secretKey) + { + this.secretKey = Preconditions.checkNotNull(secretKey); + } + + /** + * Return the delimiter character which is used to separate fields from input file. + * @return redshiftDelimiter + */ + public String getRedshiftDelimiter() + { + return redshiftDelimiter; + } + + /** + * Set the delimiter character which is used to separate fields from input file. + * @param redshiftDelimiter given redshiftDelimiter + */ + public void setRedshiftDelimiter(@NotNull String redshiftDelimiter) + { + this.redshiftDelimiter = Preconditions.checkNotNull(redshiftDelimiter); + } + + /** + * Get the AWS region from where the input file resides. + * @return region + */ + public String getRegion() + { + return region; + } + + /** + * Set the AWS region from where the input file resides. + * @param region region + */ + public void setRegion(String region) + { + this.region = region; + } + + /** + * Specifies whether the input files read from S3 or emr + * @return mode + */ + public String getReaderMode() + { + return readerMode.toString(); + } + + /** + * Set the readFromS3 which indicates whether the input files read from S3 or emr + * @param readerMode Type of reader mode + */ + public void setReaderMode(@Pattern(regexp = "READ_FROM_S3|READ_FROM_EMR", flags = Pattern.Flag.CASE_INSENSITIVE) String readerMode) + { + this.readerMode = RedshiftOutputModule.READER_MODE.valueOf(readerMode); + } + + /** + * Return the emrClusterId if the input files are located in EMR. + * @return emrClusterId + */ + public String getEmrClusterId() + { + return emrClusterId; + } + + /** + * Set the emrClusterId if the input files are located in EMR. + * @param emrClusterId emrClusterId + */ + public void setEmrClusterId(String emrClusterId) + { + this.emrClusterId = emrClusterId; + } + + /** + * Get the bucket name only if the input files are located in S3. + * @return bucketName. + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Set the bucket name only if the input files are located in S3. + * @param bucketName bucketName + */ + public void setBucketName(String bucketName) + { + this.bucketName = bucketName; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftOutputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftOutputModule.java b/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftOutputModule.java new file mode 100644 index 0000000..7d24ef9 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/db/redshift/RedshiftOutputModule.java @@ -0,0 +1,345 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.db.redshift; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Pattern; + +import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator; +import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator; +import org.apache.apex.malhar.lib.fs.s3.S3TupleOutputModule; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; + +import static org.apache.apex.malhar.lib.db.redshift.RedshiftJdbcTransactionableOutputOperator.DEFAULT_REDSHIFT_DELIMITER; +import static org.apache.apex.malhar.lib.db.redshift.RedshiftOutputModule.READER_MODE.READ_FROM_S3; + +/** + * Functionality of RedshiftOutputModule is load data into Redshift table. Data intermediately writes to HDFS/S3 and + * rolling files will load into Redshift table using copy command. + * By default, it load files from S3 into Redshfit table. If the file is located in EMR, then specify "readFromS3" parameter to false. + * + */ [email protected] +public class RedshiftOutputModule implements Module +{ + @NotNull + private String tableName; + @NotNull + private String accessKey; + @NotNull + private String secretKey; + private String region; + private String bucketName; + private String directoryName; + private String emrClusterId; + @NotNull + private String redshiftDelimiter = DEFAULT_REDSHIFT_DELIMITER; + protected static enum READER_MODE + { + READ_FROM_S3, READ_FROM_EMR; + } + + private READER_MODE readerMode = READ_FROM_S3; + private int batchSize = 100; + private Long maxLengthOfRollingFile; + private JdbcTransactionalStore store = new JdbcTransactionalStore(); + + public final transient ProxyInputPort<byte[]> input = new ProxyInputPort<byte[]>(); + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + if (readerMode == READ_FROM_S3) { + S3TupleOutputModule.S3BytesOutputModule tupleBasedS3 = dag.addModule("S3Compaction", new S3TupleOutputModule.S3BytesOutputModule()); + tupleBasedS3.setAccessKey(accessKey); + tupleBasedS3.setSecretAccessKey(secretKey); + tupleBasedS3.setBucketName(bucketName); + tupleBasedS3.setOutputDirectoryPath(directoryName); + if (maxLengthOfRollingFile != null) { + tupleBasedS3.setMaxLength(maxLengthOfRollingFile); + } + input.set(tupleBasedS3.input); + + RedshiftJdbcTransactionableOutputOperator redshiftOutput = dag.addOperator("LoadToRedshift", createRedshiftOperator()); + + dag.addStream("load-to-redshift", tupleBasedS3.output, redshiftOutput.input); + } else { + FSRecordCompactionOperator<byte[]> hdfsWriteOperator = dag.addOperator("WriteToHDFS", new FSRecordCompactionOperator<byte[]>()); + hdfsWriteOperator.setConverter(new GenericFileOutputOperator.NoOpConverter()); + if (maxLengthOfRollingFile != null) { + hdfsWriteOperator.setMaxLength(maxLengthOfRollingFile); + } + input.set(hdfsWriteOperator.input); + + RedshiftJdbcTransactionableOutputOperator redshiftOutput = dag.addOperator("LoadToRedshift", createRedshiftOperator()); + + dag.addStream("load-to-redshift", hdfsWriteOperator.output, redshiftOutput.input); + } + } + + /** + * Create the RedshiftJdbcTransactionableOutputOperator instance + * @return RedshiftJdbcTransactionableOutputOperator object + */ + protected RedshiftJdbcTransactionableOutputOperator createRedshiftOperator() + { + RedshiftJdbcTransactionableOutputOperator redshiftOutput = new RedshiftJdbcTransactionableOutputOperator(); + redshiftOutput.setAccessKey(accessKey); + redshiftOutput.setSecretKey(secretKey); + if (bucketName != null) { + redshiftOutput.setBucketName(bucketName); + } + redshiftOutput.setTableName(tableName); + if (emrClusterId != null) { + redshiftOutput.setEmrClusterId(emrClusterId); + } + redshiftOutput.setReaderMode(readerMode.toString()); + redshiftOutput.setStore(store); + redshiftOutput.setBatchSize(batchSize); + redshiftOutput.setRedshiftDelimiter(redshiftDelimiter); + if (region != null) { + redshiftOutput.setRegion(region); + } + return redshiftOutput; + } + + /** + * Get the table name from database + * @return tableName + */ + public String getTableName() + { + return tableName; + } + + /** + * Set the name of the table as it stored in redshift + * @param tableName given tableName + */ + public void setTableName(@NotNull String tableName) + { + this.tableName = Preconditions.checkNotNull(tableName); + } + + /** + * Get the AWS Access key + * @return accessKey + */ + public String getAccessKey() + { + return accessKey; + } + + /** + * Set the AWS Access Key + * @param accessKey accessKey + */ + public void setAccessKey(@NotNull String accessKey) + { + this.accessKey = Preconditions.checkNotNull(accessKey); + } + + /** + * Get the AWS secret key + * @return secretKey + */ + public String getSecretKey() + { + return secretKey; + } + + /** + * Set the AWS secret key + * @param secretKey secretKey + */ + public void setSecretKey(@NotNull String secretKey) + { + this.secretKey = Preconditions.checkNotNull(secretKey); + } + + /** + * Get the AWS region from where the input file resides. + * @return region + */ + public String getRegion() + { + return region; + } + + /** + * Set the AWS region from where the input file resides. + * This is mandatory property if S3/EMR and Redshift runs in different regions. + * @param region given region + */ + public void setRegion(String region) + { + this.region = region; + } + + /** + * Get the bucket name only if the input files are located in S3. + * @return bucketName + */ + public String getBucketName() + { + return bucketName; + } + + /** + * Set the bucket name only if the input files are located in S3. + * @param bucketName bucketName + */ + public void setBucketName(@NotNull String bucketName) + { + this.bucketName = Preconditions.checkNotNull(bucketName); + } + + /** + * Return the directory name under S3 bucket + * @return directoryName + */ + public String getDirectoryName() + { + return directoryName; + } + + /** + * Set the directory name under S3 bucket. + * @param directoryName directoryName + */ + public void setDirectoryName(@NotNull String directoryName) + { + this.directoryName = Preconditions.checkNotNull(directoryName); + } + + /** + * Get the EMR cluster id + * @return emrClusterId + */ + public String getEmrClusterId() + { + return emrClusterId; + } + + /** + * Set the EMR cluster id + * @param emrClusterId emrClusterId + */ + public void setEmrClusterId(@NotNull String emrClusterId) + { + this.emrClusterId = Preconditions.checkNotNull(emrClusterId); + } + + /** + * Return the delimiter character which is used to separate fields from input file. + * @return redshiftDelimiter + */ + public String getRedshiftDelimiter() + { + return redshiftDelimiter; + } + + /** + * Set the delimiter character which is used to separate fields from input file. + * @param redshiftDelimiter redshiftDelimiter + */ + public void setRedshiftDelimiter(@NotNull String redshiftDelimiter) + { + this.redshiftDelimiter = Preconditions.checkNotNull(redshiftDelimiter); + } + + /** + * Specifies whether the input files read from S3 or emr + * @return readerMode + */ + public String getReaderMode() + { + return readerMode.toString(); + } + + /** + * Set the readFromS3 which indicates whether the input files read from S3 or emr + * @param readerMode Type of reader mode + */ + public void setReaderMode(@Pattern(regexp = "READ_FROM_S3|READ_FROM_EMR", flags = Pattern.Flag.CASE_INSENSITIVE) String readerMode) + { + this.readerMode = RedshiftOutputModule.READER_MODE.valueOf(readerMode); + } + + /** + * Get the size of a batch operation. + * @return batchSize + */ + public int getBatchSize() + { + return batchSize; + } + + /** + * Sets the size of a batch operation. + * @param batchSize batchSize + */ + public void setBatchSize(int batchSize) + { + this.batchSize = batchSize; + } + + /** + * Get the maximum length in bytes of a rolling file. + * @return maxLengthOfRollingFile + */ + public Long getMaxLengthOfRollingFile() + { + return maxLengthOfRollingFile; + } + + /** + * Set the maximum length in bytes of a rolling file. + * @param maxLengthOfRollingFile maxLengthOfRollingFile + */ + public void setMaxLengthOfRollingFile(Long maxLengthOfRollingFile) + { + this.maxLengthOfRollingFile = maxLengthOfRollingFile; + } + + /** + * Get the JdbcTransactionalStore of a RedshiftJdbcTransactionableOutputOperator + * @return JdbcTransactionalStore + */ + public JdbcTransactionalStore getStore() + { + return store; + } + + /** + * Set the JdbcTransactionalStore + * @param store store + */ + public void setStore(@NotNull JdbcTransactionalStore store) + { + this.store = Preconditions.checkNotNull(store); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java index 5fd19f9..0c47e26 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3Reconciler.java @@ -35,15 +35,19 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.lib.io.fs.AbstractReconciler; /** @@ -74,9 +78,9 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator. private String bucketName; /** - * S3 End point + * S3 Region */ - private String endPoint; + private String region; /** * Directory name under S3 bucket @@ -98,10 +102,15 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator. private static final String TMP_EXTENSION = ".tmp"; + public final transient DefaultOutputPort<FSRecordCompactionOperator.OutputMetaData> outputPort = new DefaultOutputPort<>(); + @Override public void setup(Context.OperatorContext context) { s3client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey)); + if (region != null) { + s3client.setRegion(Region.getRegion(Regions.fromName(region))); + } filePath = context.getValue(DAG.APPLICATION_PATH); try { fs = FileSystem.newInstance(new Path(filePath).toUri(), new Configuration()); @@ -144,9 +153,8 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator. throw new RuntimeException("PutRequestSize greater than Integer.MAX_VALUE"); } if (fs.exists(path)) { - logger.debug("Trying to upload : {}", path); - s3client.putObject(request); - logger.debug("Uploading : {}", keyName); + PutObjectResult result = s3client.putObject(request); + logger.debug("File {} Uploaded at {}", keyName, result.getETag()); } } catch (FileNotFoundException e) { logger.debug("Ignoring non-existent path assuming replay : {}", outputMetaData.getPath()); @@ -161,36 +169,48 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator. @Override public void endWindow() { - logger.info("in endWindow()"); while (doneTuples.peek() != null) { FSRecordCompactionOperator.OutputMetaData metaData = doneTuples.poll(); - logger.debug("found metaData = {}", metaData); - committedTuples.remove(metaData); - try { - Path dest = new Path(metaData.getPath()); - //Deleting the intermediate files and when writing to tmp files - // there can be vagrant tmp files which we have to clean - FileStatus[] statuses = fs.listStatus(dest.getParent()); - - for (FileStatus status : statuses) { - String statusName = status.getPath().getName(); - if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(metaData.getFileName())) { - //a tmp file has tmp extension always preceded by timestamp - String actualFileName = statusName.substring(0, - statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1)); - logger.debug("actualFileName = {}", actualFileName); - if (metaData.getFileName().equals(actualFileName)) { - logger.debug("deleting stray file {}", statusName); - fs.delete(status.getPath(), true); - } - } else if (statusName.equals(metaData.getFileName())) { - logger.info("deleting s3-compaction file {}", statusName); + removeIntermediateFiles(metaData); + /*if (outputPort.isConnected()) { + // Emit the meta data with S3 path + metaData.setPath(getDirectoryName() + Path.SEPARATOR + metaData.getFileName()); + outputPort.emit(metaData); + }*/ + } + } + + /** + * Remove intermediate files + */ + protected void removeIntermediateFiles(FSRecordCompactionOperator.OutputMetaData metaData) + { + logger.debug("found metaData = {}", metaData); + committedTuples.remove(metaData); + try { + Path dest = new Path(metaData.getPath()); + //Deleting the intermediate files and when writing to tmp files + // there can be vagrant tmp files which we have to clean + FileStatus[] statuses = fs.listStatus(dest.getParent()); + + for (FileStatus status : statuses) { + String statusName = status.getPath().getName(); + if (statusName.endsWith(TMP_EXTENSION) && statusName.startsWith(metaData.getFileName())) { + //a tmp file has tmp extension always preceded by timestamp + String actualFileName = statusName.substring(0, + statusName.lastIndexOf('.', statusName.lastIndexOf('.') - 1)); + logger.debug("actualFileName = {}", actualFileName); + if (metaData.getFileName().equals(actualFileName)) { + logger.debug("deleting stray file {}", statusName); fs.delete(status.getPath(), true); } + } else if (statusName.equals(metaData.getFileName())) { + logger.info("deleting s3-compaction file {}", statusName); + fs.delete(status.getPath(), true); } - } catch (IOException e) { - logger.error("Unable to Delete a file: {}", metaData.getFileName()); } + } catch (IOException e) { + logger.error("Unable to Delete a file: {}", metaData.getFileName()); } } @@ -279,24 +299,21 @@ public class S3Reconciler extends AbstractReconciler<FSRecordCompactionOperator. } /** - * Return the S3 End point - * - * @return S3 End point + * Get the AWS S3 Region + * @return region */ - public String getEndPoint() + public String getRegion() { - return endPoint; + return region; } /** - * Set the S3 End point - * - * @param endPoint - * S3 end point + * Set the AWS S3 Region + * @param region region */ - public void setEndPoint(String endPoint) + public void setRegion(String region) { - this.endPoint = endPoint; + this.region = region; } /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java index 59cd046..7e907b1 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/s3/S3TupleOutputModule.java @@ -52,6 +52,7 @@ import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner; public abstract class S3TupleOutputModule<INPUT> implements Module { public final transient ProxyInputPort<INPUT> input = new ProxyInputPort<INPUT>(); + public final transient ProxyOutputPort<FSRecordCompactionOperator.OutputMetaData> output = new ProxyOutputPort<>(); /** * AWS access key @@ -65,9 +66,9 @@ public abstract class S3TupleOutputModule<INPUT> implements Module private String secretAccessKey; /** - * S3 End point + * S3 Region */ - private String endPoint; + private String region; /** * Name of the bucket in which to upload the files */ @@ -144,7 +145,9 @@ public abstract class S3TupleOutputModule<INPUT> implements Module s3Reconciler.setAccessKey(accessKey); s3Reconciler.setSecretKey(secretAccessKey); s3Reconciler.setBucketName(bucketName); - s3Reconciler.setEndPoint(endPoint); + if (region != null) { + s3Reconciler.setRegion(region); + } s3Reconciler.setDirectoryName(outputDirectoryPath); S3ReconcilerQueuePartitioner<S3Reconciler> reconcilerPartitioner = new S3ReconcilerQueuePartitioner<S3Reconciler>(); @@ -157,11 +160,9 @@ public abstract class S3TupleOutputModule<INPUT> implements Module Arrays.asList(new StatsListener[] {reconcilerPartitioner})); dag.setAttribute(s3Reconciler, OperatorContext.PARTITIONER, reconcilerPartitioner); - if (endPoint != null) { - s3Reconciler.setEndPoint(endPoint); - } dag.addStream("write-to-s3", s3compaction.output, s3Reconciler.input); input.set(s3compaction.input); + output.set(s3Reconciler.outputPort); } /** @@ -228,24 +229,21 @@ public abstract class S3TupleOutputModule<INPUT> implements Module } /** - * Return the S3 End point - * - * @return S3 End point + * Get the S3 Region + * @return region */ - public String getEndPoint() + public String getRegion() { - return endPoint; + return region; } /** - * Set the S3 End point - * - * @param endPoint - * S3 end point + * Set the AWS S3 region + * @param region region */ - public void setEndPoint(String endPoint) + public void setRegion(String region) { - this.endPoint = Preconditions.checkNotNull(endPoint); + this.region = region; } /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/test/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionalOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionalOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionalOperatorTest.java new file mode 100644 index 0000000..3d4496c --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/db/redshift/RedshiftJdbcTransactionalOperatorTest.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.db.redshift; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.apache.apex.malhar.lib.fs.FSRecordCompactionOperator; +import org.apache.commons.io.FileUtils; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore; + +import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; +import static org.mockito.Mockito.when; + +public class RedshiftJdbcTransactionalOperatorTest +{ + private String inputDir; + private static final String FILE_1 = "file1.txt"; + private static final String FILE_2 = "file2.txt"; + private static final String FILE_1_DATA = "460|xkalk|665\n950|xkalk|152\n850|xsblk|252"; + private static final String FILE_2_DATA = "640|xkalk|655\n50|bcklk|52"; + private static FSRecordCompactionOperator.OutputMetaData file1Meta; + private static FSRecordCompactionOperator.OutputMetaData file2Meta; + private static List<FSRecordCompactionOperator.OutputMetaData> listOfFiles = new ArrayList<>(); + private static List<String> data = new ArrayList<>(); + + public static class TestMeta extends TestWatcher + { + public String baseDirectory; + Context.OperatorContext context; + @Mock + public Statement statement; + @Mock + public JdbcTransactionalStore store; + @Mock + public Connection conn; + + @Override + protected void starting(org.junit.runner.Description description) + { + this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName(); + + MockitoAnnotations.initMocks(this); + + try { + when(store.getConnection()).thenReturn(conn); + when(conn.createStatement()).thenReturn(statement); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + @Override + protected void finished(Description description) + { + try { + FileUtils.deleteDirectory(new File(baseDirectory)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Before + public void setup() throws Exception + { + inputDir = testMeta.baseDirectory + File.separator + "input"; + + File file1 = new File(inputDir + File.separator + FILE_1); + file1Meta = new FSRecordCompactionOperator.OutputMetaData(file1.getPath(), file1.getName(), file1.length()); + FileUtils.writeStringToFile(file1, FILE_1_DATA); + + File file2 = new File(inputDir + File.separator + FILE_2); + file2Meta = new FSRecordCompactionOperator.OutputMetaData(file2.getPath(), file2.getName(), file2.length()); + FileUtils.writeStringToFile(file2, FILE_2_DATA); + } + + @Test + public void TestBatchData() throws SQLException, IOException + { + RedshiftJdbcTransactionableTestOutputOperator operator = new RedshiftJdbcTransactionableTestOutputOperator(); + operator.setReaderMode("READ_FROM_S3"); + operator.setStore(testMeta.store); + operator.setAccessKey("accessKey"); + operator.setSecretKey("secretKey"); + operator.setBucketName("bucketName"); + + Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500); + attributeMap.put(Context.DAGContext.APPLICATION_PATH, testMeta.baseDirectory); + testMeta.context = mockOperatorContext(1, attributeMap);; + + operator.setup(testMeta.context); + operator.beginWindow(1); + operator.input.process(file1Meta); + operator.input.process(file2Meta); + when(testMeta.statement.executeBatch()).thenReturn(executeBatch()); + operator.endWindow(); + Assert.assertEquals("Number of tuples in database", 5, data.size()); + } + + public int[] executeBatch() throws IOException + { + for (FSRecordCompactionOperator.OutputMetaData metaData: listOfFiles) { + data.addAll(FileUtils.readLines(new File(metaData.getPath()))); + } + return null; + } + + @Test + public void VerifyS3Properties() + { + RedshiftJdbcTransactionableTestOutputOperator operator = new RedshiftJdbcTransactionableTestOutputOperator(); + operator.setReaderMode("READ_FROM_S3"); + operator.setAccessKey("accessKey"); + operator.setSecretKey("secretKey"); + operator.setBucketName("bucketName"); + + Assert.assertNotNull(operator.getBucketName()); + } + + @Test + public void VerifyEMRProperties() + { + RedshiftJdbcTransactionableTestOutputOperator operator = new RedshiftJdbcTransactionableTestOutputOperator(); + operator.setReaderMode("READ_FROM_EMR"); + operator.setAccessKey("accessKey"); + operator.setSecretKey("secretKey"); + operator.setEmrClusterId("emrClusterId"); + + Assert.assertNotNull(operator.getEmrClusterId()); + } + + public static class RedshiftJdbcTransactionableTestOutputOperator extends RedshiftJdbcTransactionableOutputOperator + { + @Override + public void processTuple(FSRecordCompactionOperator.OutputMetaData tuple) + { + super.processTuple(tuple); + listOfFiles.add(tuple); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5c24dc8/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java index c276df7..9023b5c 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/s3/S3ReconcilerTest.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; @@ -78,7 +79,9 @@ public class S3ReconcilerTest underTest.setup(context); MockitoAnnotations.initMocks(this); - when(s3clientMock.putObject((PutObjectRequest)any())).thenReturn(null); + PutObjectResult result = new PutObjectResult(); + result.setETag(outputPath); + when(s3clientMock.putObject((PutObjectRequest)any())).thenReturn(result); underTest.setS3client(s3clientMock); }
