http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java index 5eabfe8..b312327 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveQL.java @@ -24,14 +24,24 @@ import org.apache.nifi.annotation.behavior.ReadsAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.hive.HiveDBCPService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.ErrorTypes; +import org.apache.nifi.processor.util.pattern.ExceptionHandler; +import org.apache.nifi.processor.util.pattern.ExceptionHandler.OnError; +import org.apache.nifi.processor.util.pattern.PartialFunctions.FetchFlowFiles; +import org.apache.nifi.processor.util.pattern.PartialFunctions.InitConnection; +import org.apache.nifi.processor.util.pattern.Put; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; +import org.apache.nifi.processor.util.pattern.RoutingResult; import java.nio.charset.Charset; import java.sql.Connection; @@ -108,6 +118,7 @@ public class PutHiveQL extends AbstractHiveQLProcessor { _propertyDescriptors.add(BATCH_SIZE); _propertyDescriptors.add(CHARSET); _propertyDescriptors.add(STATEMENT_DELIMITER); + _propertyDescriptors.add(RollbackOnFailure.ROLLBACK_ON_FAILURE); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); Set<Relationship> _relationships = new HashSet<>(); @@ -117,6 +128,31 @@ public class PutHiveQL extends AbstractHiveQLProcessor { relationships = Collections.unmodifiableSet(_relationships); } + private Put<FunctionContext, Connection> process; + private ExceptionHandler<FunctionContext> exceptionHandler; + + @OnScheduled + public void constructProcess() { + exceptionHandler = new ExceptionHandler<>(); + exceptionHandler.mapException(e -> { + if (e instanceof SQLNonTransientException) { + return ErrorTypes.InvalidInput; + } else if (e instanceof SQLException) { + return ErrorTypes.TemporalFailure; + } else { + return ErrorTypes.UnknownFailure; + } + }); + exceptionHandler.adjustError(RollbackOnFailure.createAdjustError(getLogger())); + + process = new Put<>(); + process.setLogger(getLogger()); + process.initConnection(initConnection); + process.fetchFlowFiles(fetchFlowFiles); + process.putFlowFile(putFlowFile); + process.adjustRoute(RollbackOnFailure.createAdjustRoute(REL_FAILURE, REL_RETRY)); + } + @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { return propertyDescriptors; @@ -127,75 +163,95 @@ public class PutHiveQL extends AbstractHiveQLProcessor { return relationships; } - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); - final List<FlowFile> flowFiles = session.get(batchSize); + private class FunctionContext extends RollbackOnFailure { + final Charset charset; + final String statementDelimiter; + final long startNanos = System.nanoTime(); + + String connectionUrl; + - if (flowFiles.isEmpty()) { - return; + private FunctionContext(boolean rollbackOnFailure, Charset charset, String statementDelimiter) { + super(rollbackOnFailure, false); + this.charset = charset; + this.statementDelimiter = statementDelimiter; } + } - final long startNanos = System.nanoTime(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + private InitConnection<FunctionContext, Connection> initConnection = (context, session, fc) -> { final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class); - final String statementDelimiter = context.getProperty(STATEMENT_DELIMITER).getValue(); + final Connection connection = dbcpService.getConnection(); + fc.connectionUrl = dbcpService.getConnectionURL(); + return connection; + }; - try (final Connection conn = dbcpService.getConnection()) { + private FetchFlowFiles<FunctionContext> fetchFlowFiles = (context, session, functionContext, result) -> { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + return session.get(batchSize); + }; - for (FlowFile flowFile : flowFiles) { - try { - final String script = getHiveQL(session, flowFile, charset); - String regex = "(?<!\\\\)" + Pattern.quote(statementDelimiter); + private Put.PutFlowFile<FunctionContext, Connection> putFlowFile = (context, session, fc, conn, flowFile, result) -> { + final String script = getHiveQL(session, flowFile, fc.charset); + String regex = "(?<!\\\\)" + Pattern.quote(fc.statementDelimiter); - String[] hiveQLs = script.split(regex); + String[] hiveQLs = script.split(regex); - int loc = 1; - for (String hiveQL: hiveQLs) { - getLogger().debug("HiveQL: {}", new Object[]{hiveQL}); + exceptionHandler.execute(fc, flowFile, input -> { + int loc = 1; + for (String hiveQL: hiveQLs) { + getLogger().debug("HiveQL: {}", new Object[]{hiveQL}); - if (!StringUtils.isEmpty(hiveQL.trim())) { - final PreparedStatement stmt = conn.prepareStatement(hiveQL.trim()); + if (!StringUtils.isEmpty(hiveQL.trim())) { + final PreparedStatement stmt = conn.prepareStatement(hiveQL.trim()); - // Get ParameterMetadata - // Hive JDBC Doesn't support this yet: - // ParameterMetaData pmd = stmt.getParameterMetaData(); - // int paramCount = pmd.getParameterCount(); + // Get ParameterMetadata + // Hive JDBC Doesn't support this yet: + // ParameterMetaData pmd = stmt.getParameterMetaData(); + // int paramCount = pmd.getParameterCount(); + int paramCount = StringUtils.countMatches(hiveQL, "?"); - int paramCount = StringUtils.countMatches(hiveQL, "?"); + if (paramCount > 0) { + loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes()); + } - if (paramCount > 0) { - loc = setParameters(loc, stmt, paramCount, flowFile.getAttributes()); - } + // Execute the statement + stmt.execute(); + fc.proceed(); + } + } - // Execute the statement - stmt.execute(); - } - } - // Emit a Provenance SEND event - final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + // Emit a Provenance SEND event + final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - fc.startNanos); - session.getProvenanceReporter().send(flowFile, dbcpService.getConnectionURL(), transmissionMillis, true); - session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, fc.connectionUrl, transmissionMillis, true); + result.routeTo(flowFile, REL_SUCCESS); - } catch (final SQLException e) { + }, onFlowFileError(context, session, result)); - if (e instanceof SQLNonTransientException) { - getLogger().error("Failed to update Hive for {} due to {}; routing to failure", new Object[]{flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - } else { - getLogger().error("Failed to update Hive for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[]{flowFile, e}); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_RETRY); - } + }; - } + private OnError<FunctionContext, FlowFile> onFlowFileError(final ProcessContext context, final ProcessSession session, final RoutingResult result) { + OnError<FunctionContext, FlowFile> onFlowFileError = ExceptionHandler.createOnError(context, session, result, REL_FAILURE, REL_RETRY); + onFlowFileError = onFlowFileError.andThen((c, i, r, e) -> { + switch (r.destination()) { + case Failure: + getLogger().error("Failed to update Hive for {} due to {}; routing to failure", new Object[] {i, e}, e); + break; + case Retry: + getLogger().error("Failed to update Hive for {} due to {}; it is possible that retrying the operation will succeed, so routing to retry", + new Object[] {i, e}, e); + break; } - } catch (final SQLException sqle) { - // There was a problem getting the connection, yield and retry the flowfiles - getLogger().error("Failed to get Hive connection due to {}; it is possible that retrying the operation will succeed, so routing to retry", new Object[]{sqle}); - session.transfer(flowFiles, REL_RETRY); - context.yield(); - } + }); + return RollbackOnFailure.createOnError(onFlowFileError); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + final Boolean rollbackOnFailure = context.getProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE).asBoolean(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + final String statementDelimiter = context.getProperty(STATEMENT_DELIMITER).getValue(); + final FunctionContext functionContext = new FunctionContext(rollbackOnFailure, charset, statementDelimiter); + RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> process.onTrigger(context, session, functionContext)); } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index 1494595..2754f9c 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.hive; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.avro.Schema; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileStream; @@ -46,34 +45,38 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.SecurityUtil; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.DiscontinuedException; +import org.apache.nifi.processor.util.pattern.ErrorTypes; +import org.apache.nifi.processor.util.pattern.ExceptionHandler; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; +import org.apache.nifi.processor.util.pattern.RoutingResult; import org.apache.nifi.util.hive.AuthenticationFailedException; import org.apache.nifi.util.hive.HiveConfigurator; import org.apache.nifi.util.hive.HiveOptions; import org.apache.nifi.util.hive.HiveUtils; import org.apache.nifi.util.hive.HiveWriter; -import org.json.JSONException; -import org.json.JSONObject; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -81,6 +84,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Function; import java.util.regex.Pattern; /** @@ -96,7 +101,7 @@ import java.util.regex.Pattern; + "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively.") }) @RequiresInstanceClassLoading -public class PutHiveStreaming extends AbstractProcessor { +public class PutHiveStreaming extends AbstractSessionFactoryProcessor { // Attributes public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count"; @@ -231,21 +236,27 @@ public class PutHiveStreaming extends AbstractProcessor { .defaultValue("10000") .build(); + public static final PropertyDescriptor ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty( + "NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed," + + " (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later)" + + " then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue." + + " Duplicated records can be created for the succeeded ones when the same FlowFile is processed again."); + // Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") - .description("A FlowFile containing the JSON contents of a record is routed to this relationship after the record has been successfully transmitted to Hive.") + .description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.") .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() .name("failure") - .description("A FlowFile containing the JSON contents of a record is routed to this relationship if the record could not be transmitted to Hive.") + .description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.") .build(); public static final Relationship REL_RETRY = new Relationship.Builder() .name("retry") .description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that " - + "some records may have been processed successfully, they will be routed (as JSON flow files) to the success relationship. " + + "some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. " + "The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This " + "can be used to provide a retry capability since full rollback is not possible.") .build(); @@ -283,6 +294,7 @@ public class PutHiveStreaming extends AbstractProcessor { props.add(HEARTBEAT_INTERVAL); props.add(TXNS_PER_BATCH); props.add(RECORDS_PER_TXN); + props.add(ROLLBACK_ON_FAILURE); kerberosConfigFile = context.getKerberosConfigurationFile(); kerberosProperties = new KerberosProperties(kerberosConfigFile); @@ -364,8 +376,213 @@ public class PutHiveStreaming extends AbstractProcessor { setupHeartBeatTimer(); } + private static class FunctionContext extends RollbackOnFailure { + + private FlowFile inputFlowFile; + private AtomicReference<FlowFile> successFlowFile; + private AtomicReference<FlowFile> failureFlowFile; + private final DataFileWriter<GenericRecord> successAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>()); + private final DataFileWriter<GenericRecord> failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>()); + + private final AtomicInteger recordCount = new AtomicInteger(0); + private final AtomicInteger successfulRecordCount = new AtomicInteger(0); + private final AtomicInteger failedRecordCount = new AtomicInteger(0); + + private volatile ExecutorService appendRecordThreadPool; + private volatile AtomicBoolean closed = new AtomicBoolean(false); + private final BlockingQueue<List<HiveStreamingRecord>> successRecordQueue = new ArrayBlockingQueue<>(100); + private final BlockingQueue<List<HiveStreamingRecord>> failureRecordQueue = new ArrayBlockingQueue<>(100); + + private final ComponentLog logger; + + /** + * It's possible that multiple Hive streaming transactions are committed within a single onTrigger. + * PutHiveStreaming onTrigger is not 'transactional' in a sense of RollbackOnFailure. + * Once a Hive streaming transaction is committed, processor session will not be rolled back. + * @param rollbackOnFailure whether process session should be rolled back if failed + */ + private FunctionContext(boolean rollbackOnFailure, ComponentLog logger) { + super(rollbackOnFailure, false); + this.logger = logger; + } + + private void setFlowFiles(FlowFile inputFlowFile, FlowFile successFlowFile, FlowFile failureFlowFile) { + this.inputFlowFile = inputFlowFile; + this.successFlowFile = new AtomicReference<>(successFlowFile); + this.failureFlowFile = new AtomicReference<>(failureFlowFile); + } + + private void initAvroWriter(ProcessSession session, String codec, DataFileStream<GenericRecord> reader, + DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef, + BlockingQueue<List<HiveStreamingRecord>> queue, Function<Integer, Boolean> isCompleted) { + + writer.setCodec(CodecFactory.fromString(codec)); + // Transfer metadata (this is a subset of the incoming file) + for (String metaKey : reader.getMetaKeys()) { + if (!RESERVED_METADATA.contains(metaKey)) { + writer.setMeta(metaKey, reader.getMeta(metaKey)); + } + } + + appendRecordThreadPool.submit(() -> { + flowFileRef.set(session.append(flowFileRef.get(), (out) -> { + // Create writer so that records can be appended. + writer.create(reader.getSchema(), out); + + try { + int writtenCount = 0; + while (true) { + + if (closed.get() && isCompleted.apply(writtenCount)) { + break; + } + + final List<HiveStreamingRecord> hRecords = queue.poll(100, TimeUnit.MILLISECONDS); + if (hRecords != null) { + try { + for (HiveStreamingRecord hRecord : hRecords) { + writer.append(hRecord.getRecord()); + writtenCount++; + } + } catch (IOException ioe) { + // The records were put to Hive Streaming successfully, but there was an error while writing the + // Avro records to the flow file. Log as an error and move on. + logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, ioe); + } + } + } + writer.flush(); + } catch (InterruptedException e) { + logger.warn("Append record thread is interrupted, " + e, e); + } + + })); + }); + } + + private void initAvroWriters(ProcessSession session, String codec, DataFileStream<GenericRecord> reader) { + appendRecordThreadPool = Executors.newFixedThreadPool(2); + initAvroWriter(session, codec, reader, successAvroWriter, successFlowFile, successRecordQueue, w -> w == successfulRecordCount.get()); + initAvroWriter(session, codec, reader, failureAvroWriter, failureFlowFile, failureRecordQueue, w -> w == failedRecordCount.get()); + + // No new task. + appendRecordThreadPool.shutdown(); + } + + private void appendRecordsToSuccess(List<HiveStreamingRecord> records) { + appendRecordsToFlowFile(records, successRecordQueue); + successfulRecordCount.addAndGet(records.size()); + } + + private void appendRecordsToFailure(List<HiveStreamingRecord> records) { + appendRecordsToFlowFile(records, failureRecordQueue); + failedRecordCount.addAndGet(records.size()); + } + + private void appendRecordsToFlowFile(List<HiveStreamingRecord> records, BlockingQueue<List<HiveStreamingRecord>> queue) { + if (!queue.add(records)) { + throw new ProcessException(String.format("Failed to append %d records due to insufficient internal queue capacity.", records.size())); + } + } + + private void transferFlowFiles(ProcessSession session, RoutingResult result, String transitUri) { + + closeAvroWriters(); + + if (successfulRecordCount.get() > 0) { + // Transfer the flow file with successful records + successFlowFile.set( + session.putAttribute(successFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(successfulRecordCount.get()))); + session.getProvenanceReporter().send(successFlowFile.get(), transitUri); + result.routeTo(successFlowFile.get(), REL_SUCCESS); + } else { + session.remove(successFlowFile.get()); + } + + if (failedRecordCount.get() > 0) { + // There were some failed records, so transfer that flow file to failure + failureFlowFile.set( + session.putAttribute(failureFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(failedRecordCount.get()))); + result.routeTo(failureFlowFile.get(), REL_FAILURE); + } else { + session.remove(failureFlowFile.get()); + } + + result.getRoutedFlowFiles().forEach((relationship, flowFiles) -> { + session.transfer(flowFiles, relationship); + }); + } + + private void closeAvroWriters() { + closed.set(true); + if (appendRecordThreadPool != null) { + // Having null thread pool means the input FlowFile was not processed at all, due to illegal format. + try { + if (!appendRecordThreadPool.awaitTermination(10, TimeUnit.SECONDS)) { + logger.warn("Waiting for Avro records being appended into output FlowFiles has been timeout."); + } + } catch (InterruptedException e) { + logger.warn("Waiting for Avro records being appended into output FlowFiles has been interrupted."); + } + } + } + } + + private static class ShouldRetryException extends RuntimeException { + private ShouldRetryException(String message, Throwable cause) { + super(message, cause); + } + } + + private ExceptionHandler.OnError<FunctionContext, List<HiveStreamingRecord>> onHiveRecordsError(ProcessContext context, ProcessSession session) { + return RollbackOnFailure.createOnError((fc, input, res, e) -> { + + if (res.penalty() == ErrorTypes.Penalty.Yield) { + context.yield(); + } + + switch (res.destination()) { + case Failure: + // Add the failed record to the failure flow file + getLogger().error(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), e); + fc.appendRecordsToFailure(input); + break; + + case Retry: + // If we can't connect to the endpoint, exit the loop and let the outer exception handler route the original flow file to retry + abortAndCloseWriters(); + throw new ShouldRetryException("Hive Streaming connect/write error, flow file will be penalized and routed to retry. " + e, e); + + case Self: + abortAndCloseWriters(); + break; + + default: + abortAndCloseWriters(); + if (e instanceof ProcessException) { + throw (ProcessException) e; + } else { + throw new ProcessException(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), e); + } + } + }); + } + + private ExceptionHandler.OnError<FunctionContext, HiveStreamingRecord> onHiveRecordError(ProcessContext context, ProcessSession session) { + return (fc, input, res, e) -> onHiveRecordsError(context, session).apply(fc, Collections.singletonList(input), res, e); + } + + private ExceptionHandler.OnError<FunctionContext, GenericRecord> onRecordError(ProcessContext context, ProcessSession session) { + return (fc, input, res, e) -> onHiveRecordError(context, session).apply(fc, new HiveStreamingRecord(null, input), res, e); + } + @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + final FunctionContext functionContext = new FunctionContext(context.getProperty(ROLLBACK_ON_FAILURE).asBoolean(), getLogger()); + RollbackOnFailure.onTrigger(context, sessionFactory, functionContext, getLogger(), session -> onTrigger(context, session, functionContext)); + } + + private void onTrigger(ProcessContext context, ProcessSession session, FunctionContext functionContext) throws ProcessException { FlowFile flowFile = session.get(); if (flowFile == null) { return; @@ -390,22 +607,58 @@ public class PutHiveStreaming extends AbstractProcessor { } } - final AtomicInteger recordCount = new AtomicInteger(0); - final AtomicInteger successfulRecordCount = new AtomicInteger(0); - List<HiveStreamingRecord> successfulRecords = new LinkedList<>(); + final AtomicReference<List<HiveStreamingRecord>> successfulRecords = new AtomicReference<>(); + successfulRecords.set(new ArrayList<>()); final FlowFile inputFlowFile = flowFile; - final AtomicBoolean processingFailure = new AtomicBoolean(false); + + final RoutingResult result = new RoutingResult(); + final ExceptionHandler<FunctionContext> exceptionHandler = new ExceptionHandler<>(); + exceptionHandler.mapException(s -> { + try { + if (s == null) { + return ErrorTypes.PersistentFailure; + } + throw s; + + } catch (IllegalArgumentException + | HiveWriter.WriteFailure + | SerializationError inputError) { + + return ErrorTypes.InvalidInput; + + } catch (HiveWriter.CommitFailure + | HiveWriter.TxnBatchFailure + | HiveWriter.TxnFailure writerTxError) { + + return ErrorTypes.TemporalInputFailure; + + } catch (ConnectionError + | HiveWriter.ConnectFailure connectionError) { + // Can't connect to Hive endpoint. + log.error("Error connecting to Hive endpoint: table {} at {}", + new Object[]{options.getTableName(), options.getMetaStoreURI()}); + + return ErrorTypes.TemporalFailure; + + } catch (IOException + | InterruptedException tempError) { + return ErrorTypes.TemporalFailure; + + } catch (Exception t) { + return ErrorTypes.UnknownFailure; + } + }); + final BiFunction<FunctionContext, ErrorTypes, ErrorTypes.Result> adjustError = RollbackOnFailure.createAdjustError(getLogger()); + exceptionHandler.adjustError(adjustError); // Create output flow files and their Avro writers - AtomicReference<FlowFile> successFlowFile = new AtomicReference<>(session.create(inputFlowFile)); - final DataFileWriter<GenericRecord> successAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>()); - AtomicReference<FlowFile> failureFlowFile = new AtomicReference<>(session.create(inputFlowFile)); - final DataFileWriter<GenericRecord> failureAvroWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>()); + functionContext.setFlowFiles(inputFlowFile, session.create(inputFlowFile), session.create(inputFlowFile)); try { session.read(inputFlowFile, in -> { try (final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) { + GenericRecord currRecord = null; // Copy codec and schema information to all writers @@ -413,239 +666,133 @@ public class PutHiveStreaming extends AbstractProcessor { ? DataFileConstants.NULL_CODEC : reader.getMetaString(DataFileConstants.CODEC); - Arrays.asList(successAvroWriter, failureAvroWriter) - .forEach((writer) -> { - writer.setCodec(CodecFactory.fromString(codec)); - // Transfer metadata (this is a subset of the incoming file) - for (String metaKey : reader.getMetaKeys()) { - if (!RESERVED_METADATA.contains(metaKey)) { - writer.setMeta(metaKey, reader.getMeta(metaKey)); - } - } - }); + functionContext.initAvroWriters(session, codec, reader); + + Runnable flushSuccessfulRecords = () -> { + // Now send the records to the successful FlowFile and update the success count + functionContext.appendRecordsToSuccess(successfulRecords.get()); + // Clear the list of successful records, we'll use it at the end when we flush whatever records are left + successfulRecords.set(new ArrayList<>()); + }; while (reader.hasNext()) { - currRecord = reader.next(currRecord); - recordCount.incrementAndGet(); + // We can NOT reuse currRecord here, because currRecord is accumulated in successful records. + // If we use the same GenericRecord instance, every record ends up having the same contents. + // To avoid this, we need to create a brand new GenericRecord instance here each time. + currRecord = reader.next(); + functionContext.recordCount.incrementAndGet(); // Extract the partition values (they must be put separately into the Hive Streaming API) List<String> partitionValues = new ArrayList<>(); - try { + if (!exceptionHandler.execute(functionContext, currRecord, input -> { for (String partition : partitionColumnList) { - Object partitionValue = currRecord.get(partition); + Object partitionValue = input.get(partition); if (partitionValue == null) { - throw new IOException("Partition column '" + partition + "' not found in Avro record"); + throw new IllegalArgumentException("Partition column '" + partition + "' not found in Avro record"); } partitionValues.add(partitionValue.toString()); } - } catch (IOException ioe) { - // Add the failed record to the failure flow file - log.error("Error writing record to Hive Streaming transaction", ioe); - appendRecordsToFlowFile(session, Collections.singletonList(new HiveStreamingRecord(null, currRecord)), - failureFlowFile, failureAvroWriter, reader); + }, onRecordError(context, session))) { continue; } - List<Schema.Field> fields = currRecord.getSchema().getFields(); - if (fields != null) { - JSONObject obj = new JSONObject(); - try { - for (Schema.Field field : fields) { - String fieldName = field.name(); - // Skip fields that are partition columns, we extracted those values above to create an EndPoint - if (!partitionColumnList.contains(fieldName)) { - Object value = currRecord.get(fieldName); - try { - obj.put(fieldName, value); - } catch (JSONException je) { - throw new IOException(je); - } - } - } - } catch (IOException ioe) { - // This really shouldn't happen since we are iterating over the schema fields, but just in case, - // add the failed record to the failure flow file. - log.error("Error writing record to Hive Streaming transaction", ioe); - appendRecordsToFlowFile(session, Collections.singletonList(new HiveStreamingRecord(null, currRecord)), - failureFlowFile, failureAvroWriter, reader); - continue; - } - final HiveStreamingRecord record = new HiveStreamingRecord(partitionValues, currRecord); - HiveEndPoint endPoint = null; - HiveWriter hiveWriter = null; - try { - endPoint = makeHiveEndPoint(record.getPartitionValues(), options); - hiveWriter = getOrCreateWriter(endPoint); - } catch (ConnectionError - | HiveWriter.ConnectFailure - | InterruptedException connectionError) { - // Can't connect to Hive endpoint. - log.error("Error connecting to Hive endpoint: table {} at {}", - new Object[]{options.getTableName(), options.getMetaStoreURI()}); - // If we can't connect to the endpoint, exit the loop and let the outer exception handler route the original flow file to retry - abortAndCloseWriters(); - throw new ProcessException(connectionError); - } - try { - try { - hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8)); - successfulRecords.add(record); - } catch (InterruptedException | HiveWriter.WriteFailure wf) { - // Add the failed record to the failure flow file - log.error("Error writing record to Hive Streaming transaction", wf); - appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader); - } + final HiveStreamingRecord record = new HiveStreamingRecord(partitionValues, currRecord); + final AtomicReference<HiveWriter> hiveWriterRef = new AtomicReference<>(); - // If we've reached the records-per-transaction limit, flush the Hive Writer and update the Avro Writer for successful records - if (hiveWriter.getTotalRecords() >= recordsPerTxn) { - hiveWriter.flush(true); - // Now send the records to the success relationship and update the success count - try { - appendRecordsToFlowFile(session, successfulRecords, successFlowFile, successAvroWriter, reader); - successfulRecordCount.accumulateAndGet(successfulRecords.size(), (current, incr) -> current + incr); - - // Clear the list of successful records, we'll use it at the end when we flush whatever records are left - successfulRecords.clear(); - - } catch (IOException ioe) { - // The records were put to Hive Streaming successfully, but there was an error while writing the - // Avro records to the flow file. Log as an error and move on. - getLogger().error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file", ioe); - } - } + // Write record to Hive streaming + if (!exceptionHandler.execute(functionContext, record, input -> { - } catch (InterruptedException - | HiveWriter.CommitFailure - | HiveWriter.TxnBatchFailure - | HiveWriter.TxnFailure - | SerializationError writeException) { - - log.error("Error writing record to Hive Streaming transaction", writeException); - // Add the failed record to the failure flow file - appendRecordsToFlowFile(session, Collections.singletonList(record), failureFlowFile, failureAvroWriter, reader); - - if (!(writeException instanceof SerializationError)) { - try { - hiveWriter.abort(); - } catch (Exception e) { - // Can't even abort properly, throw a process exception - throw new ProcessException(e); - } + final HiveEndPoint endPoint = makeHiveEndPoint(record.getPartitionValues(), options); + final HiveWriter hiveWriter = getOrCreateWriter(endPoint); + hiveWriterRef.set(hiveWriter); + + hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8)); + successfulRecords.get().add(record); + + }, onHiveRecordError(context, session))) { + continue; + } + + // If we've reached the records-per-transaction limit, flush the Hive Writer and update the Avro Writer for successful records + final HiveWriter hiveWriter = hiveWriterRef.get(); + if (hiveWriter.getTotalRecords() >= recordsPerTxn) { + exceptionHandler.execute(functionContext, successfulRecords.get(), input -> { + + hiveWriter.flush(true); + // Proceed function context. Process session can't be rollback anymore. + functionContext.proceed(); + + // Now send the records to the success relationship and update the success count + flushSuccessfulRecords.run(); + + }, onHiveRecordsError(context, session).andThen((fc, input, res, commitException) -> { + // Reset hiveWriter for succeeding records. + switch (res.destination()) { + case Retry: + case Failure: + try { + // Abort current tx and move to next. + hiveWriter.abort(); + } catch (Exception e) { + // Can't even abort properly, throw a process exception + throw new ProcessException(e); + } } - } + })); } } - try { + + exceptionHandler.execute(functionContext, successfulRecords.get(), input -> { // Finish any transactions flushAllWriters(true); closeAllWriters(); // Now send any remaining records to the success relationship and update the count - appendRecordsToFlowFile(session, successfulRecords, successFlowFile, successAvroWriter, reader); - successfulRecordCount.accumulateAndGet(successfulRecords.size(), (current, incr) -> current + incr); - successfulRecords.clear(); - - } catch (HiveWriter.CommitFailure - | HiveWriter.TxnBatchFailure - | HiveWriter.TxnFailure - | InterruptedException e) { - - // If any records are in the successfulRecords list but ended up here, then they actually weren't transferred successfully, so - // route them to failure instead - appendRecordsToFlowFile(session, successfulRecords, failureFlowFile, failureAvroWriter, reader); - } + flushSuccessfulRecords.run(); + + // Append successfulRecords on failure. + }, onHiveRecordsError(context, session)); + } catch (IOException ioe) { // The Avro file is invalid (or may not be an Avro file at all), send it to failure - log.error("The incoming flow file can not be read as an Avro file, routing to failure", ioe); - processingFailure.set(true); - } - }); - - if (recordCount.get() > 0) { - if (successfulRecordCount.get() > 0) { - // Transfer the flow file with successful records - successFlowFile.set( - session.putAttribute(successFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(recordCount.get()))); - session.getProvenanceReporter().send(successFlowFile.get(), options.getMetaStoreURI()); - session.transfer(successFlowFile.get(), REL_SUCCESS); - } else { - session.remove(successFlowFile.get()); - } + final ErrorTypes.Result adjusted = adjustError.apply(functionContext, ErrorTypes.InvalidInput); + final String msg = "The incoming flow file can not be read as an Avro file"; + switch (adjusted.destination()) { + case Failure: + log.error(msg, ioe); + result.routeTo(inputFlowFile, REL_FAILURE); + break; + case ProcessException: + throw new ProcessException(msg, ioe); - if (recordCount.get() != successfulRecordCount.get()) { - // There were some failed records, so transfer that flow file to failure - failureFlowFile.set( - session.putAttribute(failureFlowFile.get(), HIVE_STREAMING_RECORD_COUNT_ATTR, - Integer.toString(recordCount.get() - successfulRecordCount.get()))); - session.transfer(failureFlowFile.get(), REL_FAILURE); - } else { - session.remove(failureFlowFile.get()); + } } - } else { - // No records were processed, so remove the output flow files - session.remove(successFlowFile.get()); - session.remove(failureFlowFile.get()); - } - successFlowFile.set(null); - failureFlowFile.set(null); + }); // If we got here, we've processed the outgoing flow files correctly, so remove the incoming one if necessary - if (processingFailure.get()) { - session.transfer(inputFlowFile, REL_FAILURE); - } else { - session.remove(flowFile); + if (result.getRoutedFlowFiles().values().stream().noneMatch(routed -> routed.contains(inputFlowFile))) { + session.remove(inputFlowFile); } - } catch (ProcessException pe) { - abortAndCloseWriters(); - Throwable t = pe.getCause(); - if (t != null) { - if (t instanceof ConnectionError - || t instanceof HiveWriter.ConnectFailure - || t instanceof HiveWriter.CommitFailure - || t instanceof HiveWriter.TxnBatchFailure - || t instanceof HiveWriter.TxnFailure - || t instanceof InterruptedException) { - log.error("Hive Streaming connect/write error, flow file will be penalized and routed to retry", t); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_RETRY); - // Remove the ones we created - if (successFlowFile.get() != null) { - session.remove(successFlowFile.get()); - } - if (failureFlowFile.get() != null) { - session.remove(failureFlowFile.get()); - } - } else { - throw pe; - } - } else { - throw pe; - } + } catch (DiscontinuedException e) { + // The input FlowFile processing is discontinued. Keep it in the input queue. + getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e); + result.routeTo(flowFile, Relationship.SELF); + + } catch (ShouldRetryException e) { + // This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. + getLogger().error(e.getMessage(), e); + flowFile = session.penalize(flowFile); + result.routeTo(flowFile, REL_RETRY); + } finally { + functionContext.transferFlowFiles(session, result, options.getMetaStoreURI()); // Restore original class loader, might not be necessary but is good practice since the processor task changed it Thread.currentThread().setContextClassLoader(originalClassloader); } } - private void appendRecordsToFlowFile(ProcessSession session, - List<HiveStreamingRecord> records, - AtomicReference<FlowFile> appendFlowFile, - DataFileWriter<GenericRecord> avroWriter, - DataFileStream<GenericRecord> reader) throws IOException { - - appendFlowFile.set(session.append(appendFlowFile.get(), (out) -> { - - try (DataFileWriter<GenericRecord> writer = avroWriter.create(reader.getSchema(), out)) { - for (HiveStreamingRecord sRecord : records) { - writer.append(sRecord.getRecord()); - } - writer.flush(); - } - })); - } - @OnStopped public void cleanup() { ComponentLog log = getLogger(); http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index 342fada..e61fa9f 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -50,11 +50,13 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.PartialFunctions; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.hive.CsvOutputOptions; import org.apache.nifi.util.hive.HiveJdbcCommon; @@ -209,7 +211,11 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { } @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + PartialFunctions.onTrigger(context, sessionFactory, getLogger(), session -> onTrigger(context, session)); + } + + private void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final FlowFile fileToProcess = (context.hasIncomingConnection()? session.get():null); FlowFile flowfile = null; http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java index c7498f9..5624f79 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveQL.java @@ -19,6 +19,7 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.dbcp.DBCPService; import org.apache.nifi.dbcp.hive.HiveDBCPService; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.pattern.RollbackOnFailure; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -42,6 +43,7 @@ import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestPutHiveQL { private static final String createPersons = "CREATE TABLE PERSONS (id integer primary key, name varchar(100), code integer)"; @@ -128,6 +130,91 @@ public class TestPutHiveQL { runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3); } + @Test + public void testFailInMiddleWithBadStatementRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', 84)".getBytes()); + runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes()); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes()); + runner.run(); + + // The 1st one should be routed to success, others should stay in queue. + assertEquals(3, runner.getQueueSize().getObjectCount()); + runner.assertTransferCount(PutHiveQL.REL_FAILURE, 0); + runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 1); + } + + @Test + public void testFailAtBeginning() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes()); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes()); + runner.run(); + + runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1); + runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 2); + } + + @Test + public void testFailAtBeginningRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + runner.enqueue("INSERT INTO PERSONS".getBytes()); // intentionally wrong syntax + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Tom', 3)".getBytes()); + runner.enqueue("INSERT INTO PERSONS (NAME, CODE) VALUES ('Harry', 44)".getBytes()); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + assertEquals(3, runner.getQueueSize().getObjectCount()); + runner.assertTransferCount(PutHiveQL.REL_FAILURE, 0); + runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 0); + } @Test public void testFailInMiddleWithBadParameterType() throws InitializationException, ProcessException, SQLException, IOException { @@ -189,7 +276,56 @@ public class TestPutHiveQL { final Map<String, String> badAttributes = new HashMap<>(); badAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); - badAttributes.put("hiveql.args.1.value", "9999"); + badAttributes.put("hiveql.args.1.value", "101"); // Constraint violation, up to 100 + + final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, badAttributes); + runner.enqueue(data, goodAttributes); + runner.enqueue(data, goodAttributes); + runner.run(); + + runner.assertTransferCount(PutHiveQL.REL_SUCCESS, 3); + runner.assertTransferCount(PutHiveQL.REL_FAILURE, 1); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Mark", rs.getString(2)); + assertEquals(84, rs.getInt(3)); + assertTrue(rs.next()); + assertTrue(rs.next()); + assertFalse(rs.next()); + } + } + } + + @Test + public void testFailInMiddleWithBadNumberFormat() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final File tempDir = folder.getRoot(); + final File dbDir = new File(tempDir, "db"); + final DBCPService service = new MockDBCPService(dbDir.getAbsolutePath()); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + try (final Connection conn = service.getConnection()) { + try (final Statement stmt = conn.createStatement()) { + stmt.executeUpdate(createPersonsAutoId); + } + } + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + + final Map<String, String> goodAttributes = new HashMap<>(); + goodAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + goodAttributes.put("hiveql.args.1.value", "84"); + + final Map<String, String> badAttributes = new HashMap<>(); + badAttributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + badAttributes.put("hiveql.args.1.value", "NOT_NUMBER"); final byte[] data = "INSERT INTO PERSONS (NAME, CODE) VALUES ('Mark', ?)".getBytes(); runner.enqueue(data, goodAttributes); @@ -540,6 +676,44 @@ public class TestPutHiveQL { runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 1); } + @Test + public void testRetryableFailureRollbackOnFailure() throws InitializationException, ProcessException, SQLException, IOException { + final TestRunner runner = TestRunners.newTestRunner(PutHiveQL.class); + final DBCPService service = new SQLExceptionService(null); + runner.addControllerService("dbcp", service); + runner.enableControllerService(service); + + runner.setProperty(PutHiveQL.HIVE_DBCP_SERVICE, "dbcp"); + runner.setProperty(RollbackOnFailure.ROLLBACK_ON_FAILURE, "true"); + + final String sql = "INSERT INTO PERSONS (ID, NAME, CODE) VALUES (?, ?, ?); " + + "UPDATE PERSONS SET NAME='George' WHERE ID=?; "; + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.1.value", "1"); + + attributes.put("hiveql.args.2.type", String.valueOf(Types.VARCHAR)); + attributes.put("hiveql.args.2.value", "Mark"); + + attributes.put("hiveql.args.3.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.3.value", "84"); + + attributes.put("hiveql.args.4.type", String.valueOf(Types.INTEGER)); + attributes.put("hiveql.args.4.value", "1"); + + runner.enqueue(sql.getBytes(), attributes); + try { + runner.run(); + fail("Should throw ProcessException"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + assertEquals(1, runner.getQueueSize().getObjectCount()); + runner.assertAllFlowFilesTransferred(PutHiveQL.REL_RETRY, 0); + } + /** * Simple implementation only for testing purposes */ @@ -607,7 +781,7 @@ public class TestPutHiveQL { @Override public String getConnectionURL() { - return service.getConnectionURL(); + return service != null ? service.getConnectionURL() : null; } } }
