Repository: apex-malhar Updated Branches: refs/heads/master 2fe2903bf -> 10dd94ef5
APEXMALHAR-2278.KuduNonTransactionalOutputOperator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/10dd94ef Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/10dd94ef Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/10dd94ef Branch: refs/heads/master Commit: 10dd94ef56e81a795a9a7295e74b686ffd79b255 Parents: 2fe2903 Author: Ananth <[email protected]> Authored: Thu May 4 06:19:05 2017 +1000 Committer: Ananth <[email protected]> Committed: Thu May 4 06:19:05 2017 +1000 ---------------------------------------------------------------------- contrib/pom.xml | 6 + .../kudu/AbstractKuduOutputOperator.java | 660 +++++++++++++++++++ .../malhar/contrib/kudu/ApexKuduConnection.java | 223 +++++++ .../contrib/kudu/BaseKuduOutputOperator.java | 158 +++++ .../contrib/kudu/KuduExecutionContext.java | 109 +++ .../malhar/contrib/kudu/KuduMutationType.java | 32 + ...uduCreateUpdateDeleteOutputOperatorTest.java | 336 ++++++++++ .../contrib/kudu/SimpleKuduOutputOperator.java | 52 ++ .../malhar/contrib/kudu/UnitTestTablePojo.java | 125 ++++ .../resources/kuduoutputoperator.properties | 22 + 10 files changed, 1723 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/pom.xml b/contrib/pom.xml index 893ec2d..83305cb 100755 --- a/contrib/pom.xml +++ b/contrib/pom.xml @@ -678,5 +678,11 @@ <artifactId>jackson-databind</artifactId> <version>2.7.0</version> </dependency> + <dependency> + <groupId>org.apache.kudu</groupId> + <artifactId>kudu-client</artifactId> + <version>1.3.0</version> + <optional>true</optional> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java new file mode 100644 index 0000000..250334b --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java @@ -0,0 +1,660 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.kudu; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.Delete; +import org.apache.kudu.client.Insert; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.Operation; +import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.Statistics; +import org.apache.kudu.client.Update; +import org.apache.kudu.client.Upsert; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.PojoUtils; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * An Abstract operator that would allow concrete implementations to write a POJO value into a given Kudu table. + * <p> + * To use this operator, the following needs to be done by the implementor + * <ol> + * <li>Create a concrete implementation of this operator and implement the method to define the connection + * properties to Kudu. The connection properties are set using the {@link ApexKuduConnection} using a builder pattern. + * </li> + * <li>Implement the logic how tuples need to ne handled in the event of a reconciliation phase ( i.e. when an + * operator is resuming back from failure ). See javadoc of the method for more details.</li> + * <li>Define the payload class</li> + * </ol> + * </p> + * <p> + * Note that the tuple that is getting processed is not the POJO class. The tuple that is getting processed is the + * {@link KuduExecutionContext} instance. This is because the operator supports mutation types as a higher level + * construct than simply writing a POJO into a Kudu table row. + * </p> + * <p> + * Supported mutations are: + * <ol> + * <li>INSERT</li> + * <li>UPDATE</li> + * <li>UPSERT</li> + * <li>DELETE</li> + * </ol> + * </p> + * <p> + * Please note that the Update mutation allows to change a subset of columns only even though there might be columns + * that were defined to be non-nullable. This is because the original mutation of type insert would have written the + * mandatory columns. In such scenarios, the method setDoNotWriteColumns() in {@link KuduExecutionContext} can be + * used to specify only those columns that need an update. This ways a read and Update pattern can be merged to a + * simple update pattern thus avoiding a read if required.</p> + * */ [email protected] +public abstract class AbstractKuduOutputOperator extends BaseOperator + implements Operator.ActivationListener<Context.OperatorContext>,Operator.CheckpointNotificationListener +{ + + private transient ApexKuduConnection apexKuduConnection; + + private transient KuduTable kuduTable; + + private transient KuduSession kuduSession; + + private transient KuduClient kuduClientHandle; + + private transient Map<String,ColumnSchema> allColumnDefs; + + private transient Map<String,Object> kuduColumnBasedGetters; + + private Set<String> primaryKeyColumnNames; + + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractKuduOutputOperator.class); + + @NotNull + private WindowDataManager windowDataManager; + + private transient long currentWindowId; + + private transient boolean isInReplayMode; + + private transient boolean isInReconcilingMode; + + private transient long reconcilingWindowId; + + @AutoMetric + transient long numInserts; + + @AutoMetric + transient long numUpserts; + + @AutoMetric + transient long numDeletes; + + @AutoMetric + transient long numUpdates; + + @AutoMetric + transient long numOpsErrors; + + @AutoMetric + transient long numBytesWritten; + + @AutoMetric + transient long numRpcErrors; + + @AutoMetric + transient long numWriteOps; + + @AutoMetric + transient long numWriteRPCs; + + @AutoMetric + long totalOpsErrors = 0; + + @AutoMetric + long totalBytesWritten = 0; + + @AutoMetric + long totalRpcErrors = 0; + + @AutoMetric + long totalWriteOps = 0; + + @AutoMetric + long totalWriteRPCs = 0; + + @AutoMetric + long totalInsertsSinceStart; + + @AutoMetric + long totalUpsertsSinceStart; + + @AutoMetric + long totalDeletesSinceStart; + + @AutoMetric + long totalUpdatesSinceStart; + + public final transient DefaultInputPort<KuduExecutionContext> input = new DefaultInputPort<KuduExecutionContext>() + { + @Override + public void process(KuduExecutionContext kuduExecutionContext) + { + processTuple(kuduExecutionContext); + } + }; // end input port implementation + + public void processTuple(KuduExecutionContext kuduExecutionContext) + { + if ( isInReconcilingMode || isInReplayMode) { + if ( !isEligibleForPassivationInReconcilingWindow(kuduExecutionContext, currentWindowId)) { + return; + } + } + KuduMutationType mutationType = kuduExecutionContext.getMutationType(); + switch (mutationType) { + case DELETE: + processForDelete(kuduExecutionContext); + numDeletes += 1; + totalDeletesSinceStart += 1; + break; + case INSERT: + processForInsert(kuduExecutionContext); + numInserts += 1; + totalInsertsSinceStart += 1; + break; + case UPDATE: + processForUpdate(kuduExecutionContext); + numUpdates += 1; + totalUpdatesSinceStart += 1; + break; + case UPSERT: + processForUpsert(kuduExecutionContext); + numUpserts += 1; + totalUpsertsSinceStart += 1; + break; + default: + break; + } + } + + /*** + * Sets the values from the Pojo into the Kudu mutation object. + * @param currentOperation The operation instance that represents the current mutation. This will be applied to the + * current session + * @param kuduExecutionContext The tuple that contains the payload as well as other information like mutation type etc + */ + @SuppressWarnings(value = "unchecked") + private void performCommonProcessing(Operation currentOperation, KuduExecutionContext kuduExecutionContext) + { + currentOperation.setExternalConsistencyMode(kuduExecutionContext.getExternalConsistencyMode()); + currentOperation.setPropagatedTimestamp(kuduExecutionContext.getPropagatedTimestamp()); + PartialRow partialRow = currentOperation.getRow(); + Object payload = kuduExecutionContext.getPayload(); + Set<String> doNotWriteColumns = kuduExecutionContext.getDoNotWriteColumns(); + if (doNotWriteColumns == null) { + doNotWriteColumns = new HashSet<>(); + } + for (String columnName: kuduColumnBasedGetters.keySet()) { + if ( doNotWriteColumns.contains(columnName)) { + continue; + } + ColumnSchema columnSchema = allColumnDefs.get(columnName); + Type dataType = columnSchema.getType(); + try { + switch (dataType) { + case STRING: + PojoUtils.Getter<Object, String> stringGetter = ((PojoUtils.Getter<Object, String>)kuduColumnBasedGetters + .get(columnName)); + if (stringGetter != null) { + final String stringValue = stringGetter.get(payload); + if (stringValue != null) { + partialRow.addString(columnName, stringValue); + } + } + break; + case BINARY: + PojoUtils.Getter<Object, ByteBuffer> byteBufferGetter = ((PojoUtils.Getter<Object, ByteBuffer>) + kuduColumnBasedGetters.get(columnName)); + if (byteBufferGetter != null) { + final ByteBuffer byteBufferValue = byteBufferGetter.get(payload); + if (byteBufferValue != null) { + partialRow.addBinary(columnName, byteBufferValue); + } + } + break; + case BOOL: + PojoUtils.GetterBoolean<Object> boolGetter = ((PojoUtils.GetterBoolean<Object>)kuduColumnBasedGetters.get( + columnName)); + if (boolGetter != null) { + final boolean boolValue = boolGetter.get(payload); + partialRow.addBoolean(columnName, boolValue); + } + break; + case DOUBLE: + PojoUtils.GetterDouble<Object> doubleGetter = ((PojoUtils.GetterDouble<Object>)kuduColumnBasedGetters.get( + columnName)); + if (doubleGetter != null) { + final double doubleValue = doubleGetter.get(payload); + partialRow.addDouble(columnName, doubleValue); + } + break; + case FLOAT: + PojoUtils.GetterFloat<Object> floatGetter = ((PojoUtils.GetterFloat<Object>)kuduColumnBasedGetters.get( + columnName)); + if (floatGetter != null) { + final float floatValue = floatGetter.get(payload); + partialRow.addFloat(columnName, floatValue); + } + break; + case INT8: + PojoUtils.GetterByte<Object> byteGetter = ((PojoUtils.GetterByte<Object>)kuduColumnBasedGetters.get( + columnName)); + if (byteGetter != null) { + final byte byteValue = byteGetter.get(payload); + partialRow.addByte(columnName, byteValue); + } + break; + case INT16: + PojoUtils.GetterShort<Object> shortGetter = ((PojoUtils.GetterShort<Object>)kuduColumnBasedGetters.get( + columnName)); + if (shortGetter != null) { + final short shortValue = shortGetter.get(payload); + partialRow.addShort(columnName, shortValue); + } + break; + case INT32: + PojoUtils.GetterInt<Object> intGetter = ((PojoUtils.GetterInt<Object>) + kuduColumnBasedGetters.get(columnName)); + if (intGetter != null) { + final int intValue = intGetter.get(payload); + partialRow.addInt(columnName, intValue); + } + break; + case INT64: + case UNIXTIME_MICROS: + PojoUtils.GetterLong<Object> longGetter = ((PojoUtils.GetterLong<Object>)kuduColumnBasedGetters.get( + columnName)); + if (longGetter != null) { + final long longValue = longGetter.get(payload); + partialRow.addLong(columnName, longValue); + } + break; + default: + LOG.error(columnName + " is not of the supported data type"); + throw new UnsupportedOperationException("Kudu does not support data type for column " + columnName); + } + } catch ( Exception ex ) { + LOG.error(" Exception while fetching the value of " + columnName + " because " + ex.getMessage()); + partialRow.setNull(columnName); + } + } + try { + kuduSession.apply(currentOperation); + } catch (KuduException e) { + LOG.error("Could not execute operation because " + e.getMessage(), e); + throw new RuntimeException(e.getMessage()); + } + } + + protected void processForUpdate(KuduExecutionContext kuduExecutionContext) + { + Update thisUpdate = kuduTable.newUpdate(); + performCommonProcessing(thisUpdate,kuduExecutionContext); + } + + + protected void processForUpsert(KuduExecutionContext kuduExecutionContext) + { + Upsert thisUpsert = kuduTable.newUpsert(); + performCommonProcessing(thisUpsert,kuduExecutionContext); + } + + + + protected void processForDelete(KuduExecutionContext kuduExecutionContext) + { + Delete thisDelete = kuduTable.newDelete(); + // Kudu does not allow column values to be set in case of a delete mutation + Set<String> doNotWriteCols = kuduExecutionContext.getDoNotWriteColumns(); + if ( doNotWriteCols == null) { + doNotWriteCols = new HashSet<>(); + } + doNotWriteCols.clear(); + for (String columnName : allColumnDefs.keySet()) { + if ( !(primaryKeyColumnNames.contains(columnName))) { + doNotWriteCols.add(columnName); + } + } + kuduExecutionContext.setDoNotWriteColumns(doNotWriteCols); + performCommonProcessing(thisDelete,kuduExecutionContext); + } + + + protected void processForInsert(KuduExecutionContext kuduExecutionContext) + { + Insert thisInsert = kuduTable.newInsert(); + performCommonProcessing(thisInsert,kuduExecutionContext); + } + + @Override + public void activate(Context.OperatorContext context) + { + ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionBuilder = getKuduConnectionConfig(); + apexKuduConnection = apexKuduConnectionBuilder.build(); + checkNotNull(apexKuduConnection,"Kudu connection cannot be null"); + kuduTable = apexKuduConnection.getKuduTable(); + kuduSession = apexKuduConnection.getKuduSession(); + kuduClientHandle = apexKuduConnection.getKuduClient(); + checkNotNull(kuduTable,"Kudu Table cannot be null"); + checkNotNull(kuduSession, "Kudu session cannot be null"); + allColumnDefs = new HashMap(); + primaryKeyColumnNames = new HashSet<>(); + kuduColumnBasedGetters = new HashMap(); + buildGettersForPojoPayload(); + reconcilingWindowId = Stateless.WINDOW_ID; + // The operator is working in a replay mode where the upstream buffer is re-streaming the tuples + // Note that there are two windows that need special core. The window that is being replayed and the subsequent + // window that might have resulted in a crash which we are referring as reconciling window + if ( (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID) && + context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < + windowDataManager.getLargestCompletedWindow()) { + reconcilingWindowId = windowDataManager.getLargestCompletedWindow() + 1; + } + + if ( (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID) && + context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) == + windowDataManager.getLargestCompletedWindow()) { + reconcilingWindowId = windowDataManager.getLargestCompletedWindow(); + } + } + + @Override + public void deactivate() + { + try { + apexKuduConnection.close(); + } catch (Exception e) { + LOG.error("Could not close kudu session and resources because " + e.getMessage(), e); + } + } + + @Override + public void beforeCheckpoint(long l) + { + // Nothing to be done here. Child classes can use this if required + } + + @Override + public void checkpointed(long l) + { + // Nothing to be done here. Child classes can use this if required + } + + @Override + public void committed(long windowId) + { + try { + windowDataManager.committed(windowId); + } catch (IOException e) { + LOG.error("Error while committing the window id " + windowId + " because " + e.getMessage()); + throw new RuntimeException(e); + } + } + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + windowDataManager = getWindowDataManager(); + if ( windowDataManager == null) { + windowDataManager = new FSWindowDataManager(); + } + windowDataManager.setup(context); + } + + @Override + public void teardown() + { + windowDataManager.teardown(); + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + currentWindowId = windowId; + if ( currentWindowId != Stateless.WINDOW_ID) { // if it is not the first window of the application + if (currentWindowId > reconcilingWindowId) { + isInReplayMode = false; + isInReconcilingMode = false; + } + if (currentWindowId == reconcilingWindowId) { + isInReconcilingMode = true; + isInReplayMode = false; + } + if (currentWindowId < reconcilingWindowId) { + isInReconcilingMode = false; + isInReplayMode = true; + } + } + numDeletes = 0; + numInserts = 0; + numUpdates = 0; + numUpserts = 0; + } + + @Override + public void endWindow() + { + try { + kuduSession.flush(); + } catch (KuduException e) { + LOG.error("Could not flush kudu session on an end window boundary " + e.getMessage(), e); + throw new RuntimeException(e); + } + if ( currentWindowId > windowDataManager.getLargestCompletedWindow()) { + try { + windowDataManager.save(currentWindowId,currentWindowId); + } catch (IOException e) { + LOG.error("Error while persisting the current window state " + currentWindowId + " because " + e.getMessage()); + throw new RuntimeException(e); + } + } + numOpsErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.OPS_ERRORS) - + totalOpsErrors; + numBytesWritten = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.BYTES_WRITTEN) - + totalBytesWritten; + numRpcErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.RPC_ERRORS) - + totalRpcErrors; + numWriteOps = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_OPS) - + totalWriteOps; + numWriteRPCs = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_RPCS) - totalWriteOps; + totalOpsErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.OPS_ERRORS); + totalBytesWritten = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.BYTES_WRITTEN); + totalRpcErrors = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.RPC_ERRORS); + totalWriteOps = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_OPS); + totalWriteRPCs = kuduClientHandle.getStatistics().getClientStatistic(Statistics.Statistic.WRITE_RPCS); + } + + private void buildGettersForPojoPayload() + { + Class payloadClass = getTuplePayloadClass(); + checkNotNull(payloadClass,"Payload class cannot be null"); + Field[] classFields = payloadClass.getDeclaredFields(); + Schema schemaInfo = kuduTable.getSchema(); + List<ColumnSchema> allColumns = schemaInfo.getColumns(); + Set<String> allKuduTableColumnNames = new HashSet<>(); + Map<String,ColumnSchema> normalizedColumns = new HashMap(); + for ( ColumnSchema aColumnDef : allColumns) { + allColumnDefs.put(aColumnDef.getName(), aColumnDef); + normalizedColumns.put(aColumnDef.getName().toLowerCase(), aColumnDef); + allKuduTableColumnNames.add(aColumnDef.getName().toLowerCase()); + } + List<ColumnSchema> primaryKeyColumns = schemaInfo.getPrimaryKeyColumns(); + for (ColumnSchema primaryKeyInfo : primaryKeyColumns) { + primaryKeyColumnNames.add(primaryKeyInfo.getName()); + } + Map<String,String> columnNameOverrides = getOverridingColumnNameMap(); + if (columnNameOverrides == null) { + columnNameOverrides = new HashMap(); // to avoid null checks further down the line + } + for ( Field aFieldDef : classFields) { + String currentFieldName = aFieldDef.getName().toLowerCase(); + if (allKuduTableColumnNames.contains(currentFieldName)) { + extractGetterForColumn(normalizedColumns.get(currentFieldName),aFieldDef); + } else { + if (columnNameOverrides.containsKey(aFieldDef.getName())) { + extractGetterForColumn(normalizedColumns.get(columnNameOverrides.get(aFieldDef.getName()).toLowerCase()), + aFieldDef); + } else if (columnNameOverrides.containsKey(aFieldDef.getName().toLowerCase())) { + extractGetterForColumn(normalizedColumns.get(columnNameOverrides.get(aFieldDef.getName().toLowerCase()) + .toLowerCase()),aFieldDef); + } + } + } + } + + /*** + * Used to build a getter for the given schema column from the POJO field definition + * @param columnSchema The Kudu column definition + * @param fieldDefinition The POJO field definition + */ + private void extractGetterForColumn(ColumnSchema columnSchema, Field fieldDefinition) + { + Type columnType = columnSchema.getType(); + Class pojoClass = getTuplePayloadClass(); + Object getter = null; + switch ( columnType ) { + case BINARY: + getter = PojoUtils.createGetter(pojoClass, fieldDefinition.getName(), ByteBuffer.class); + break; + case STRING: + getter = PojoUtils.createGetter(pojoClass, fieldDefinition.getName(), String.class); + break; + case BOOL: + getter = PojoUtils.createGetterBoolean(pojoClass, fieldDefinition.getName()); + break; + case DOUBLE: + getter = PojoUtils.createGetterDouble(pojoClass, fieldDefinition.getName()); + break; + case FLOAT: + getter = PojoUtils.createGetterFloat(pojoClass, fieldDefinition.getName()); + break; + case INT8: + getter = PojoUtils.createGetterByte(pojoClass, fieldDefinition.getName()); + break; + case INT16: + getter = PojoUtils.createGetterShort(pojoClass, fieldDefinition.getName()); + break; + case INT32: + getter = PojoUtils.createGetterInt(pojoClass, fieldDefinition.getName()); + break; + case INT64: + case UNIXTIME_MICROS: + getter = PojoUtils.createGetterLong(pojoClass, fieldDefinition.getName()); + break; + default: + LOG.error(fieldDefinition.getName() + " has a data type that is not yet supported"); + throw new UnsupportedOperationException(fieldDefinition.getName() + " does not have a compatible data type"); + } + if (getter != null) { + kuduColumnBasedGetters.put(columnSchema.getName(),getter); + } + } + + + public static ApexKuduConnection.ApexKuduConnectionBuilder usingKuduConnectionBuilder() + { + return new ApexKuduConnection.ApexKuduConnectionBuilder(); + } + + public WindowDataManager getWindowDataManager() + { + return windowDataManager; + } + + public void setWindowDataManager(WindowDataManager windowDataManager) + { + this.windowDataManager = windowDataManager; + } + + /*** + * Allows to map a POJO field name to a Kudu Table column name. This is useful in case + * the POJO field name can't be changed to an unconventional name ( ex: if kudu column names have underscores ). It + * can be also useful when the developer does not want to declare a new POJO but reuse an existing POJO. + * Note that the key in the map is the name of the field in the POJO and + * the value part is used to denote the name of the kudu column + * @return The map giving the mapping from POJO field name to the Kudu column name + */ + protected Map<String,String> getOverridingColumnNameMap() + { + return new HashMap<>(); + } + + abstract ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig(); + + /*** + * Represents the Tuple payload class that maps to a Kudu table row. Note that the POJO fields are mapped to the + * kudu column names is a lenient way. For example, the mapping of POJO field names to the kudu columns is done + * in a case-insensitive way. + * @return The class that will be used to map to a row in the given Kudu table. + */ + protected abstract Class getTuplePayloadClass(); + + /*** + * This is used to give control to the concrete implementation of this operator how to resolve whether to write a + * mutation into a given kudu table in the event of a failure and the operator subsequently resumes. This window + * is marked as a reconciling window. It is only for this reconciling window that we need to give control to the + * concrete operator implementor how to actually resolve if the entry needs to be excuted as a mutation in Kudu. + * @param executionContext The tuple which represents the execution context along with the payload + * @param reconcilingWindowId The window Id of the reconciling window + * @return true if we would like the entry to result in a mutation in the Kudu table. + */ + protected abstract boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext executionContext, + long reconcilingWindowId); +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java new file mode 100644 index 0000000..aed6b8b --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.kudu; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.kudu.client.ExternalConsistencyMode; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduSession; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.SessionConfiguration; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + + +/** + * <p>Represents a connection to the Kudu cluster. An instance of this class is to be supplied (via a builder pattern to) + * {@link AbstractKuduOutputOperator} to connect to a Kudu cluster.</p> + */ + +public class ApexKuduConnection implements AutoCloseable, Serializable +{ + private static final long serialVersionUID = 4720185362997969198L; + + private transient KuduSession kuduSession; + + private transient KuduTable kuduTable; + + private transient KuduClient kuduClient; + + public static final Logger LOG = LoggerFactory.getLogger(ApexKuduConnection.class); + + + private ApexKuduConnection(ApexKuduConnectionBuilder builder) + { + checkNotNull(builder,"Builder cannot be null to establish kudu session"); + checkArgument(builder.mastersCollection.size() > 0, "Atleast one kudu master needs to be specified"); + checkNotNull(builder.tableName,"Kudu table cannot be null"); + KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(builder.mastersCollection); + if (builder.isOperationTimeOutSet) { + kuduClientBuilder.defaultOperationTimeoutMs(builder.operationTimeOutMs); + } + if (builder.isBossThreadCountSet) { + kuduClientBuilder.bossCount(builder.numBossThreads); + } + if (builder.isWorkerThreadsCountSet) { + kuduClientBuilder.workerCount(builder.workerThreads); + } + if (builder.isSocketReadTimeOutSet) { + kuduClientBuilder.defaultSocketReadTimeoutMs(builder.socketReadTimeOutMs); + } + kuduClient = kuduClientBuilder.build(); + kuduSession = kuduClient.newSession(); + if (builder.isFlushModeSet) { + kuduSession.setFlushMode(builder.flushMode); + } + if (builder.isExternalConsistencyModeSet) { + kuduSession.setExternalConsistencyMode(builder.externalConsistencyMode); + } + try { + if (!kuduClient.tableExists(builder.tableName)) { + throw new Exception("Table " + builder.tableName + " does not exist. "); + } else { + kuduTable = kuduClient.openTable(builder.tableName); + } + } catch (Exception e) { + LOG.error("Kudu table existence could not be ascertained " + e.getMessage()); + throw new RuntimeException(e.getMessage()); + } + } + + @Override + public void close() throws Exception + { + kuduSession.close(); + kuduClient.close(); + } + + public KuduSession getKuduSession() + { + return kuduSession; + } + + public void setKuduSession(KuduSession kuduSession) + { + this.kuduSession = kuduSession; + } + + public KuduTable getKuduTable() + { + return kuduTable; + } + + public void setKuduTable(KuduTable kuduTable) + { + this.kuduTable = kuduTable; + } + + public KuduClient getKuduClient() + { + return kuduClient; + } + + public void setKuduClient(KuduClient kuduClient) + { + this.kuduClient = kuduClient; + } + + public static class ApexKuduConnectionBuilder + { + List<String> mastersCollection = new ArrayList<>(); + + String tableName; + + // optional props + int numBossThreads = 1; + + boolean isBossThreadCountSet = false; + + int workerThreads = 2 * Runtime.getRuntime().availableProcessors(); + + boolean isWorkerThreadsCountSet = false; + + long socketReadTimeOutMs = 10000; + + boolean isSocketReadTimeOutSet = false; + + long operationTimeOutMs = 30000; + + boolean isOperationTimeOutSet = false; + + ExternalConsistencyMode externalConsistencyMode; + + boolean isExternalConsistencyModeSet = false; + + SessionConfiguration.FlushMode flushMode; + + boolean isFlushModeSet = false; + + public ApexKuduConnectionBuilder withTableName(String tableName) + { + this.tableName = tableName; + return this; + } + + public ApexKuduConnectionBuilder withAPossibleMasterHostAs(String masterHostAndPort) + { + mastersCollection.add(masterHostAndPort); + return this; + } + + public ApexKuduConnectionBuilder withNumberOfBossThreads(int numberOfBossThreads) + { + this.numBossThreads = numberOfBossThreads; + isBossThreadCountSet = true; + return this; + } + + public ApexKuduConnectionBuilder withNumberOfWorkerThreads(int numberOfWorkerThreads) + { + this.workerThreads = numberOfWorkerThreads; + isWorkerThreadsCountSet = true; + return this; + } + + public ApexKuduConnectionBuilder withSocketReadTimeOutAs(long socketReadTimeOut) + { + this.socketReadTimeOutMs = socketReadTimeOut; + isSocketReadTimeOutSet = true; + return this; + } + + public ApexKuduConnectionBuilder withOperationTimeOutAs(long operationTimeOut) + { + this.operationTimeOutMs = operationTimeOut; + isOperationTimeOutSet = true; + return this; + } + + public ApexKuduConnectionBuilder withExternalConsistencyMode(ExternalConsistencyMode externalConsistencyMode) + { + this.externalConsistencyMode = externalConsistencyMode; + isExternalConsistencyModeSet = true; + return this; + } + + public ApexKuduConnectionBuilder withFlushMode(SessionConfiguration.FlushMode flushMode) + { + this.flushMode = flushMode; + isFlushModeSet = true; + return this; + } + + protected ApexKuduConnection build() + { + ApexKuduConnection apexKuduConnection = new ApexKuduConnection(this); + return apexKuduConnection; + } + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java new file mode 100644 index 0000000..6de7190 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.kudu; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.kudu.client.ExternalConsistencyMode; +import org.apache.kudu.client.SessionConfiguration; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Provides a default implementation for writing tuples as Kudu rows. + * The user will have to either provide for + * <ol> + * <li>a property file containing properties like Kudu master list, Kudu table name and other connection properties. + * The operator will fail to launch if the properties file named kuduoutputoperator.properties is not locatable in the + * root path</li> + * <li>Use the default constructor which supports minimum required properties as parameters</li> + * <li>In case of presence of multiple Kudu output operators in the same application, use the String based + * constructor which accepts a file name for each of the kudu table that the incoming pojo needs to be passivated + * to</li> + * </ol> + * <p> + * The properties file will have to consist of the following keys: + * <br>masterhosts=<ip1:host>,<ip2:host>,..# Comma separated</br> + * <br>tablename=akudutablename</br> + * <br>pojoclassname=somepojoclasswithgettersandsetters; # Do not append name with .class at the end and + * do not forget to give the complete class name including the package</br> + * </p> + */ +public class BaseKuduOutputOperator extends AbstractKuduOutputOperator +{ + public static final String DEFAULT_CONNECTION_PROPS_FILE_NAME = "kuduoutputoperator.properties"; + + public static final String TABLE_NAME = "tablename"; + + public static final String MASTER_HOSTS = "masterhosts"; + + public static final String POJO_CLASS_NAME = "pojoclassname"; + + private Class pojoPayloadClass; + + private ApexKuduConnection.ApexKuduConnectionBuilder apexKuduConnectionBuilder; + + public BaseKuduOutputOperator() throws IOException, ClassNotFoundException + { + initConnectionBuilderProperties(DEFAULT_CONNECTION_PROPS_FILE_NAME); + } + + public BaseKuduOutputOperator(String configFileInClasspath) throws IOException, ClassNotFoundException + { + initConnectionBuilderProperties(configFileInClasspath); + } + + private void initConnectionBuilderProperties(String configFileInClasspath) throws IOException, ClassNotFoundException + { + Properties kuduConnectionProperties = new Properties(); + ClassLoader loader = Thread.currentThread().getContextClassLoader(); + InputStream kuduPropsFileAsStream = loader.getResourceAsStream(configFileInClasspath); + if (kuduPropsFileAsStream != null) { + kuduConnectionProperties.load(kuduPropsFileAsStream); + } else { + throw new IOException("Properties file required for Kudu connection " + configFileInClasspath + + " is not locatable in the root classpath"); + } + String tableName = checkNotNull(kuduConnectionProperties.getProperty(TABLE_NAME)); + String pojoClassName = checkNotNull(kuduConnectionProperties.getProperty(POJO_CLASS_NAME)); + String masterHostsConnectionString = checkNotNull(kuduConnectionProperties.getProperty(MASTER_HOSTS)); + String[] masterAndHosts = masterHostsConnectionString.split(","); + pojoPayloadClass = Class.forName(pojoClassName); + initKuduConfig(tableName, Arrays.asList(masterAndHosts)); + } + + private void initKuduConfig(String kuduTableName, List<String> kuduMasters) + { + apexKuduConnectionBuilder = new ApexKuduConnection.ApexKuduConnectionBuilder() + .withTableName(kuduTableName) + .withExternalConsistencyMode(ExternalConsistencyMode.COMMIT_WAIT) + .withFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC) + .withNumberOfBossThreads(1) + .withNumberOfWorkerThreads(2) + .withSocketReadTimeOutAs(3000) + .withOperationTimeOutAs(3000); + for ( String aMasterAndHost: kuduMasters ) { + apexKuduConnectionBuilder = apexKuduConnectionBuilder.withAPossibleMasterHostAs(aMasterAndHost); + } + } + + + public BaseKuduOutputOperator(String kuduTableName,List<String> kuduMasters, Class pojoPayloadClass) + { + this.pojoPayloadClass = pojoPayloadClass; + initKuduConfig(kuduTableName,kuduMasters); + } + + @Override + ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig() + { + return apexKuduConnectionBuilder; + } + + /** + * Can be used to further fine tune any of the connection configs once the constructor completes instantiating the + * Kudu Connection config builder. + * @return The Connection Config that would be used to initiate a connection to the Kudu Cluster once the operator is + * deserialized in the node manager managed container. See {@link AbstractKuduOutputOperator} activate() method for + * more details. + */ + public ApexKuduConnection.ApexKuduConnectionBuilder getApexKuduConnectionBuilder() + { + return apexKuduConnectionBuilder; + } + + + /** + * + * @return The pojo class that would be streamed in the KuduExecutionContext + */ + @Override + protected Class getTuplePayloadClass() + { + return pojoPayloadClass; + } + + /** + * The default is to implement for ATLEAST_ONCE semantics. Override this control this behavior. + * @param executionContext The tuple which represents the execution context along with the payload + * @param reconcilingWindowId The window Id of the reconciling window + * @return + */ + @Override + protected boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext executionContext, + long reconcilingWindowId) + { + return true; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java new file mode 100644 index 0000000..27d382d --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.kudu; + + + +import java.util.HashSet; +import java.util.Set; + +import org.apache.kudu.client.ExternalConsistencyMode; + +/** + * <p>Represents a summary of the mutation that needs to be done on the Kudu table. The type of mutation is + * decided by the KuduMutation Type field. The actual data that is mutated inside the kudu table row is + * represented by the payload. The execution context itself a templated class based on the payload class.</p> + */ +public class KuduExecutionContext<T> +{ + private T payload; + /*** + * <p>Represents the set of columns that are not to be written to a Kudu row. Note that this is useful when we + * would like to not write a set of columns into the table either because + * <ol> + * <li>We are doing an update and we would like to update only a few of the columns as the original columns were + * already written in the original insert mutation</li> + * <li>When we would like to not write a column because the column is an optional column as per the schema definition + * </li> + * </ol> + * It may be noted that the client driver will throw an exception when a mandatory column is not written.</p> + */ + private Set<String> doNotWriteColumns = new HashSet<>(); + + private KuduMutationType mutationType = KuduMutationType.UPSERT; + + private ExternalConsistencyMode externalConsistencyMode; + + private long propagatedTimestamp; + + public T getPayload() + { + return payload; + } + + public void setPayload(T payload) + { + this.payload = payload; + } + + public KuduMutationType getMutationType() + { + return mutationType; + } + + public void setMutationType(KuduMutationType mutationType) + { + this.mutationType = mutationType; + } + + public ExternalConsistencyMode getExternalConsistencyMode() + { + return externalConsistencyMode; + } + + public void setExternalConsistencyMode(ExternalConsistencyMode externalConsistencyMode) + { + this.externalConsistencyMode = externalConsistencyMode; + } + + public long getPropagatedTimestamp() + { + return propagatedTimestamp; + } + + public void setPropagatedTimestamp(long propagatedTimestamp) + { + this.propagatedTimestamp = propagatedTimestamp; + } + + public Set<String> getDoNotWriteColumns() + { + return doNotWriteColumns; + } + + public void setDoNotWriteColumns(Set<String> doNotWriteColumns) + { + this.doNotWriteColumns = doNotWriteColumns; + } + + public void addDoNotWriteColumn(String aKuduColumnName) + { + doNotWriteColumns.add(aKuduColumnName); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java new file mode 100644 index 0000000..64b46c6 --- /dev/null +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduMutationType.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.kudu; + +/** + * <p>Used in {@link KuduExecutionContext} to denote the type of mutation we would like to be executed for the + * mutation being represented by the current tuple</p> + */ +public enum KuduMutationType +{ + + INSERT, + DELETE, + UPDATE, + UPSERT +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java new file mode 100644 index 0000000..615427a --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/KuduCreateUpdateDeleteOutputOperatorTest.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.kudu; + +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.Type; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.KuduPredicate; +import org.apache.kudu.client.KuduScanner; +import org.apache.kudu.client.KuduTable; +import org.apache.kudu.client.RowResult; +import org.apache.kudu.client.RowResultIterator; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.helper.TestPortContext; +import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; +import static org.junit.Assert.assertEquals; + + +public class KuduCreateUpdateDeleteOutputOperatorTest +{ + + + private static final transient Logger LOG = LoggerFactory.getLogger(KuduCreateUpdateDeleteOutputOperatorTest.class); + + private static final String tableName = "unittests"; + + private final String APP_ID = "TestKuduOutputOperator"; + + private final int OPERATOR_ID_FOR_KUDU_CRUD = 0; + + private static KuduClient kuduClient; + + private static KuduTable kuduTable; + + private static Map<String,ColumnSchema> columnDefs = new HashMap<>(); + + private BaseKuduOutputOperator simpleKuduOutputOperator; + + private OperatorContext contextForKuduOutputOperator; + + private TestPortContext testPortContextForKuduOutput; + + @BeforeClass + public static void setup() throws Exception + { + kuduClient = getClientHandle(); + if (kuduClient.tableExists(tableName)) { + kuduClient.deleteTable(tableName); + } + createTestTable(tableName,kuduClient); + kuduTable = kuduClient.openTable(tableName); + } + + @AfterClass + public static void shutdown() throws Exception + { + kuduClient.close(); + } + + @Before + public void setUpKuduOutputOperatorContext() throws Exception + { + Attribute.AttributeMap.DefaultAttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); + attributeMap.put(DAG.APPLICATION_ID, APP_ID); + contextForKuduOutputOperator = mockOperatorContext(OPERATOR_ID_FOR_KUDU_CRUD, attributeMap); + simpleKuduOutputOperator = new BaseKuduOutputOperator(); + Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + portAttributes.put(Context.PortContext.TUPLE_CLASS, UnitTestTablePojo.class); + testPortContextForKuduOutput = new TestPortContext(portAttributes); + simpleKuduOutputOperator.setup(contextForKuduOutputOperator); + simpleKuduOutputOperator.activate(contextForKuduOutputOperator); + simpleKuduOutputOperator.input.setup(testPortContextForKuduOutput); + } + + private static KuduClient getClientHandle() throws Exception + { + KuduClient.KuduClientBuilder builder = new KuduClient.KuduClientBuilder("localhost:7051"); + KuduClient client = builder.build(); + return client; + } + + private static void createTestTable(String tableName, KuduClient client) throws Exception + { + List<ColumnSchema> columns = new ArrayList<>(); + ColumnSchema intRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("introwkey", Type.INT32) + .key(true) + .build(); + columns.add(intRowKeyCol); + columnDefs.put("introwkey",intRowKeyCol); + ColumnSchema stringRowKeyCol = new ColumnSchema.ColumnSchemaBuilder("stringrowkey", Type.STRING) + .key(true) + .build(); + columns.add(stringRowKeyCol); + columnDefs.put("stringrowkey",stringRowKeyCol); + ColumnSchema timestampRowKey = new ColumnSchema.ColumnSchemaBuilder("timestamprowkey", Type.UNIXTIME_MICROS) + .key(true) + .build(); + columns.add(timestampRowKey); + columnDefs.put("timestamprowkey",timestampRowKey); + ColumnSchema longData = new ColumnSchema.ColumnSchemaBuilder("longdata", Type.INT64) + .build(); + columns.add(longData); + columnDefs.put("longdata",longData); + ColumnSchema stringData = new ColumnSchema.ColumnSchemaBuilder("stringdata", Type.STRING) + .build(); + columns.add(stringData); + columnDefs.put("stringdata",stringData); + ColumnSchema timestampdata = new ColumnSchema.ColumnSchemaBuilder("timestampdata", Type.UNIXTIME_MICROS) + .build(); + columns.add(timestampdata); + columnDefs.put("timestampdata",timestampdata); + ColumnSchema binarydata = new ColumnSchema.ColumnSchemaBuilder("binarydata", Type.BINARY) + .build(); + columns.add(binarydata); + columnDefs.put("binarydata",binarydata); + ColumnSchema floatdata = new ColumnSchema.ColumnSchemaBuilder("floatdata", Type.FLOAT) + .build(); + columns.add(floatdata); + columnDefs.put("floatdata",floatdata); + ColumnSchema booldata = new ColumnSchema.ColumnSchemaBuilder("booldata", Type.BOOL) + .build(); + columns.add(booldata); + columnDefs.put("booldata",booldata); + List<String> rangeKeys = new ArrayList<>(); + rangeKeys.add("stringrowkey"); + rangeKeys.add("timestamprowkey"); + List<String> hashPartitions = new ArrayList<>(); + hashPartitions.add("introwkey"); + Schema schema = new Schema(columns); + try { + client.createTable(tableName, schema, + new CreateTableOptions() + .setNumReplicas(1) + .setRangePartitionColumns(rangeKeys) + .addHashPartitions(hashPartitions,2)); + } catch (KuduException e) { + LOG.error("Error while creating table for unit tests " + e.getMessage(), e); + throw e; + } + } + + private void lookUpAndPopulateRecord(UnitTestTablePojo keyInfo) throws Exception + { + KuduScanner scanner = kuduClient.newScannerBuilder(kuduTable) + .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("introwkey"), + KuduPredicate.ComparisonOp.EQUAL,keyInfo.getIntrowkey())) + .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("stringrowkey"), + KuduPredicate.ComparisonOp.EQUAL,keyInfo.getStringrowkey())) + .addPredicate(KuduPredicate.newComparisonPredicate(columnDefs.get("timestamprowkey"), + KuduPredicate.ComparisonOp.EQUAL,keyInfo.getTimestamprowkey())) + .build(); + RowResultIterator rowResultItr = scanner.nextRows(); + while (rowResultItr.hasNext()) { + RowResult thisRow = rowResultItr.next(); + keyInfo.setFloatdata(thisRow.getFloat("floatdata")); + keyInfo.setBooldata(thisRow.getBoolean("booldata")); + keyInfo.setBinarydata(thisRow.getBinary("binarydata")); + keyInfo.setLongdata(thisRow.getLong("longdata")); + keyInfo.setTimestampdata(thisRow.getLong("timestampdata")); + keyInfo.setStringdata("stringdata"); + break; + } + } + + @Test + public void processForUpdate() throws Exception + { + KuduExecutionContext<UnitTestTablePojo> newInsertExecutionContext = new KuduExecutionContext<>(); + UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo(); + unitTestTablePojo.setIntrowkey(2); + unitTestTablePojo.setStringrowkey("two" + System.currentTimeMillis()); + unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis()); + unitTestTablePojo.setBooldata(true); + unitTestTablePojo.setFloatdata(3.2f); + unitTestTablePojo.setStringdata("" + System.currentTimeMillis()); + unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1); + unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2); + unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes())); + newInsertExecutionContext.setMutationType(KuduMutationType.INSERT); + newInsertExecutionContext.setPayload(unitTestTablePojo); + + simpleKuduOutputOperator.beginWindow(1); + simpleKuduOutputOperator.input.process(newInsertExecutionContext); + KuduExecutionContext<UnitTestTablePojo> updateExecutionContext = new KuduExecutionContext<>(); + UnitTestTablePojo updatingRecord = new UnitTestTablePojo(); + updateExecutionContext.setMutationType(KuduMutationType.UPDATE); + updatingRecord.setBooldata(false); + updatingRecord.setIntrowkey(unitTestTablePojo.getIntrowkey()); + updatingRecord.setStringrowkey(unitTestTablePojo.getStringrowkey()); + updatingRecord.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey()); + updateExecutionContext.setPayload(updatingRecord); + simpleKuduOutputOperator.input.process(updateExecutionContext); + simpleKuduOutputOperator.endWindow(); + + UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo(); + unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey()); + unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey()); + unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey()); + lookUpAndPopulateRecord(unitTestTablePojoRead); + assertEquals(unitTestTablePojoRead.isBooldata(), false); + } + + @Test + public void processForUpsert() throws Exception + { + KuduExecutionContext<UnitTestTablePojo> upsertExecutionContext = new KuduExecutionContext<>(); + UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo(); + unitTestTablePojo.setIntrowkey(3); + unitTestTablePojo.setStringrowkey("three" + System.currentTimeMillis()); + unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis()); + unitTestTablePojo.setBooldata(false); + unitTestTablePojo.setFloatdata(3.2f); + unitTestTablePojo.setStringdata("" + System.currentTimeMillis()); + unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1); + unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2); + unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes())); + upsertExecutionContext.setMutationType(KuduMutationType.UPSERT); + upsertExecutionContext.setPayload(unitTestTablePojo); + + simpleKuduOutputOperator.beginWindow(2); + simpleKuduOutputOperator.input.process(upsertExecutionContext); + upsertExecutionContext.setMutationType(KuduMutationType.UPSERT); + unitTestTablePojo.setBooldata(true); + simpleKuduOutputOperator.input.process(upsertExecutionContext); + simpleKuduOutputOperator.endWindow(); + + UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo(); + unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey()); + unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey()); + unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey()); + lookUpAndPopulateRecord(unitTestTablePojoRead); + assertEquals(unitTestTablePojoRead.isBooldata(), true); + } + + @Test + public void processForDelete() throws Exception + { + KuduExecutionContext<UnitTestTablePojo> insertExecutionContext = new KuduExecutionContext<>(); + UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo(); + unitTestTablePojo.setIntrowkey(4); + unitTestTablePojo.setStringrowkey("four" + System.currentTimeMillis()); + unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis()); + unitTestTablePojo.setBooldata(false); + unitTestTablePojo.setFloatdata(3.2f); + unitTestTablePojo.setStringdata("" + System.currentTimeMillis()); + unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1); + unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2); + unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes())); + insertExecutionContext.setMutationType(KuduMutationType.INSERT); + insertExecutionContext.setPayload(unitTestTablePojo); + + simpleKuduOutputOperator.beginWindow(3); + simpleKuduOutputOperator.input.process(insertExecutionContext); + KuduExecutionContext<UnitTestTablePojo> deleteExecutionContext = new KuduExecutionContext<>(); + UnitTestTablePojo unitTestTablePojoDelete = new UnitTestTablePojo(); + unitTestTablePojoDelete.setIntrowkey(unitTestTablePojo.getIntrowkey()); + unitTestTablePojoDelete.setStringrowkey(unitTestTablePojo.getStringrowkey()); + unitTestTablePojoDelete.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey()); + deleteExecutionContext.setMutationType(KuduMutationType.DELETE); + deleteExecutionContext.setPayload(unitTestTablePojoDelete); + simpleKuduOutputOperator.input.process(deleteExecutionContext); + simpleKuduOutputOperator.endWindow(); + + UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo(); + unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey()); + unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey()); + unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey()); + lookUpAndPopulateRecord(unitTestTablePojoRead); + assertEquals(unitTestTablePojoRead.getBinarydata(), null); + } + + @Test + public void processForInsert() throws Exception + { + KuduExecutionContext<UnitTestTablePojo> insertExecutionContext = new KuduExecutionContext<>(); + UnitTestTablePojo unitTestTablePojo = new UnitTestTablePojo(); + unitTestTablePojo.setIntrowkey(1); + unitTestTablePojo.setStringrowkey("one" + System.currentTimeMillis()); + unitTestTablePojo.setTimestamprowkey(System.currentTimeMillis()); + unitTestTablePojo.setBooldata(true); + unitTestTablePojo.setFloatdata(3.2f); + unitTestTablePojo.setStringdata("" + System.currentTimeMillis()); + unitTestTablePojo.setLongdata(System.currentTimeMillis() + 1); + unitTestTablePojo.setTimestampdata(System.currentTimeMillis() + 2); + unitTestTablePojo.setBinarydata(ByteBuffer.wrap("stringdata".getBytes())); + insertExecutionContext.setMutationType(KuduMutationType.INSERT); + insertExecutionContext.setPayload(unitTestTablePojo); + + simpleKuduOutputOperator.beginWindow(0); + simpleKuduOutputOperator.input.process(insertExecutionContext); + simpleKuduOutputOperator.endWindow(); + + UnitTestTablePojo unitTestTablePojoRead = new UnitTestTablePojo(); + unitTestTablePojoRead.setIntrowkey(unitTestTablePojo.getIntrowkey()); + unitTestTablePojoRead.setStringrowkey(unitTestTablePojo.getStringrowkey()); + unitTestTablePojoRead.setTimestamprowkey(unitTestTablePojo.getTimestamprowkey()); + lookUpAndPopulateRecord(unitTestTablePojoRead); + assertEquals("" + unitTestTablePojoRead.getFloatdata(),"" + unitTestTablePojo.getFloatdata()); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java new file mode 100644 index 0000000..3933df7 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/SimpleKuduOutputOperator.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.kudu; + +import org.apache.kudu.client.ExternalConsistencyMode; +import org.apache.kudu.client.SessionConfiguration; + +public class SimpleKuduOutputOperator extends AbstractKuduOutputOperator +{ + @Override + ApexKuduConnection.ApexKuduConnectionBuilder getKuduConnectionConfig() + { + return new ApexKuduConnection.ApexKuduConnectionBuilder() + .withAPossibleMasterHostAs("localhost:7051") + .withTableName("unittests") + .withExternalConsistencyMode(ExternalConsistencyMode.COMMIT_WAIT) + .withFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC) + .withNumberOfBossThreads(1) + .withNumberOfWorkerThreads(2) + .withSocketReadTimeOutAs(3000) + .withOperationTimeOutAs(3000); + } + + @Override + protected boolean isEligibleForPassivationInReconcilingWindow(KuduExecutionContext executionContext, + long reconcilingWindowId) + { + return true; + } + + @Override + protected Class getTuplePayloadClass() + { + return UnitTestTablePojo.class; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java new file mode 100644 index 0000000..dc2cc33 --- /dev/null +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/kudu/UnitTestTablePojo.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.contrib.kudu; + + +import java.nio.ByteBuffer; + +public class UnitTestTablePojo +{ + private int introwkey; + private String stringrowkey; + private long timestamprowkey; + private long longdata; + private String stringdata; + private long timestampdata; + private ByteBuffer binarydata; + private float floatdata; + private boolean booldata; + + public int getIntrowkey() + { + return introwkey; + } + + public void setIntrowkey(int introwkey) + { + this.introwkey = introwkey; + } + + public String getStringrowkey() + { + return stringrowkey; + } + + public void setStringrowkey(String stringrowkey) + { + this.stringrowkey = stringrowkey; + } + + public long getTimestamprowkey() + { + return timestamprowkey; + } + + public void setTimestamprowkey(long timestamprowkey) + { + this.timestamprowkey = timestamprowkey; + } + + public long getLongdata() + { + return longdata; + } + + public void setLongdata(long longdata) + { + this.longdata = longdata; + } + + public String getStringdata() + { + return stringdata; + } + + public void setStringdata(String stringdata) + { + this.stringdata = stringdata; + } + + public long getTimestampdata() + { + return timestampdata; + } + + public void setTimestampdata(long timestampdata) + { + this.timestampdata = timestampdata; + } + + public ByteBuffer getBinarydata() + { + return binarydata; + } + + public void setBinarydata(ByteBuffer binarydata) + { + this.binarydata = binarydata; + } + + public float getFloatdata() + { + return floatdata; + } + + public void setFloatdata(float floatdata) + { + this.floatdata = floatdata; + } + + public boolean isBooldata() + { + return booldata; + } + + public void setBooldata(boolean booldata) + { + this.booldata = booldata; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/10dd94ef/contrib/src/test/resources/kuduoutputoperator.properties ---------------------------------------------------------------------- diff --git a/contrib/src/test/resources/kuduoutputoperator.properties b/contrib/src/test/resources/kuduoutputoperator.properties new file mode 100644 index 0000000..8f41c63 --- /dev/null +++ b/contrib/src/test/resources/kuduoutputoperator.properties @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +masterhosts=192.168.2.141:7051,192.168.2.133:7051 +tablename=unittests +pojoclassname=org.apache.apex.malhar.contrib.kudu.UnitTestTablePojo \ No newline at end of file
