http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java new file mode 100644 index 0000000..6a65783 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java @@ -0,0 +1,878 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.streaming.ConnectionStats; +import org.apache.hive.streaming.HiveRecordWriter; +import org.apache.hive.streaming.PartitionInfo; +import org.apache.hive.streaming.RecordWriter; +import org.apache.hive.streaming.StreamingConnection; +import org.apache.hive.streaming.StreamingException; +import org.apache.hive.streaming.StubConnectionError; +import org.apache.hive.streaming.StubSerializationError; +import org.apache.hive.streaming.StubStreamingIOFailure; +import org.apache.hive.streaming.StubTransactionError; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ControllerServiceInitializationContext; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.kerberos.KerberosCredentialsService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.hive.HiveConfigurator; +import org.apache.nifi.util.hive.HiveOptions; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +import static org.apache.nifi.processors.hive.AbstractHive3QLProcessor.ATTR_OUTPUT_TABLES; +import static org.apache.nifi.processors.hive.PutHive3Streaming.HIVE_STREAMING_RECORD_COUNT_ATTR; +import static org.apache.nifi.processors.hive.PutHive3Streaming.KERBEROS_CREDENTIALS_SERVICE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for PutHive3Streaming processor. + */ +public class TestPutHive3Streaming { + + private static final String TEST_CONF_PATH = "src/test/resources/core-site.xml"; + private static final String TARGET_HIVE = "target/hive"; + + private TestRunner runner; + private MockPutHive3Streaming processor; + + private HiveConfigurator hiveConfigurator; + private HiveConf hiveConf; + private UserGroupInformation ugi; + private Schema schema; + + @Before + public void setUp() throws Exception { + + final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user.avsc"), StandardCharsets.UTF_8); + schema = new Schema.Parser().parse(avroSchema); + + Configuration testConf = new Configuration(); + testConf.addResource(new Path(TEST_CONF_PATH)); + + // needed for calls to UserGroupInformation.setConfiguration() to work when passing in + // config with Kerberos authentication enabled + System.setProperty("java.security.krb5.realm", "nifi.com"); + System.setProperty("java.security.krb5.kdc", "nifi.kdc"); + + ugi = null; + processor = new MockPutHive3Streaming(); + hiveConfigurator = mock(HiveConfigurator.class); + hiveConf = new HiveConf(); + when(hiveConfigurator.getConfigurationFromFiles(anyString())).thenReturn(hiveConf); + processor.hiveConfigurator = hiveConfigurator; + + // Delete any temp files from previous tests + try { + FileUtils.deleteDirectory(new File(TARGET_HIVE)); + } catch (IOException ioe) { + // Do nothing, directory may not have existed + } + } + + private void configure(final PutHive3Streaming processor, final int numUsers) throws InitializationException { + configure(processor, numUsers, -1); + } + + private void configure(final PutHive3Streaming processor, final int numUsers, int failAfter) throws InitializationException { + configure(processor, numUsers, failAfter, null); + } + + private void configure(final PutHive3Streaming processor, final int numUsers, final int failAfter, + final BiFunction<Integer, MockRecordParser, Void> recordGenerator) throws InitializationException { + runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH); + MockRecordParser readerFactory = new MockRecordParser(); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); + for (final RecordField recordField : recordSchema.getFields()) { + readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); + } + + if (recordGenerator == null) { + for (int i = 0; i < numUsers; i++) { + readerFactory.addRecord("name" + i, i, "blue" + i, i * 10.0); + } + } else { + recordGenerator.apply(numUsers, readerFactory); + } + + readerFactory.failAfter(failAfter); + + runner.addControllerService("mock-reader-factory", readerFactory); + runner.enableControllerService(readerFactory); + + runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory"); + } + + private void configureComplex(final MockPutHive3Streaming processor, final int numUsers, final int failAfter, + final BiFunction<Integer, MockRecordParser, Void> recordGenerator) throws IOException, InitializationException { + final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/array_of_records.avsc"), StandardCharsets.UTF_8); + schema = new Schema.Parser().parse(avroSchema); + processor.setFields(Arrays.asList(new FieldSchema("records", + serdeConstants.LIST_TYPE_NAME + "<" + + serdeConstants.MAP_TYPE_NAME + "<" + + serdeConstants.STRING_TYPE_NAME + "," + + serdeConstants.STRING_TYPE_NAME + ">>", ""))); + runner = TestRunners.newTestRunner(processor); + runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH); + MockRecordParser readerFactory = new MockRecordParser(); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); + for (final RecordField recordField : recordSchema.getFields()) { + readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); + } + + if (recordGenerator == null) { + Object[] mapArray = new Object[numUsers]; + for (int i = 0; i < numUsers; i++) { + final int x = i; + Map<String, Object> map = new HashMap<String, Object>() {{ + put("name", "name" + x); + put("age", x * 5); + }}; + mapArray[i] = map; + } + readerFactory.addRecord((Object)mapArray); + } else { + recordGenerator.apply(numUsers, readerFactory); + } + + readerFactory.failAfter(failAfter); + + runner.addControllerService("mock-reader-factory", readerFactory); + runner.enableControllerService(readerFactory); + + runner.setProperty(PutHive3Streaming.RECORD_READER, "mock-reader-factory"); + } + + @Test + public void testSetup() throws Exception { + configure(processor, 0); + runner.assertNotValid(); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.assertNotValid(); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.assertValid(); + runner.run(); + } + + @Test + public void testUgiGetsCleared() throws Exception { + configure(processor, 0); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + processor.ugi = mock(UserGroupInformation.class); + runner.run(); + assertNull(processor.ugi); + } + + @Test + public void testUgiGetsSetIfSecure() throws Exception { + configure(processor, 1); + hiveConf.set(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION, SecurityUtil.KERBEROS); + KerberosCredentialsService kcs = new MockKerberosCredentialsService(); + runner.addControllerService("kcs", kcs); + runner.setProperty(KERBEROS_CREDENTIALS_SERVICE, "kcs"); + runner.enableControllerService(kcs); + ugi = mock(UserGroupInformation.class); + when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), anyString())).thenReturn(ugi); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.enqueue(new byte[0]); + runner.run(); + } + + @Test(expected = AssertionError.class) + public void testSetupWithKerberosAuthFailed() throws Exception { + configure(processor, 0); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, "src/test/resources/core-site-security.xml, src/test/resources/hive-site-security.xml"); + + hiveConf.set(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION, SecurityUtil.KERBEROS); + KerberosCredentialsService kcs = new MockKerberosCredentialsService(null, null); + runner.addControllerService("kcs", kcs); + runner.setProperty(KERBEROS_CREDENTIALS_SERVICE, "kcs"); + runner.enableControllerService(kcs); + runner.assertNotValid(); + runner.run(); + } + + @Test + public void onTrigger() throws Exception { + configure(processor, 1); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0); + assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); + } + + @Test + public void onTriggerComplex() throws Exception { + configureComplex(processor, 10, -1, null); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0); + // Schema is an array of size 10, so only one record is output + assertEquals("1", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); + } + + @Test + public void onTriggerBadInput() throws Exception { + configure(processor, 1, 0); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.enqueue("I am not an Avro record".getBytes()); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1); + } + + @Test + public void onTriggerBadInputRollbackOnFailure() throws Exception { + configure(processor, 1, 0); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + + runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true"); + runner.enqueue("I am not an Avro record".getBytes()); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0); + // Assert incoming FlowFile stays in input queue. + assertEquals(1, runner.getQueueSize().getObjectCount()); + } + + + @Test + public void onTriggerMultipleRecordsSingleTransaction() throws Exception { + configure(processor, 3); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + Map<String, Object> user1 = new HashMap<String, Object>() { + { + put("name", "Joe"); + put("favorite_number", 146); + } + }; + Map<String, Object> user2 = new HashMap<String, Object>() { + { + put("name", "Mary"); + put("favorite_number", 42); + } + }; + Map<String, Object> user3 = new HashMap<String, Object>() { + { + put("name", "Matt"); + put("favorite_number", 3); + } + }; + final List<Map<String, Object>> users = Arrays.asList(user1, user2, user3); + runner.enqueue(createAvroRecord(users)); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 1); + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_SUCCESS).get(0); + assertOutputAvroRecords(users, resultFlowFile); + } + + @Test + public void onTriggerMultipleRecordsFailInMiddle() throws Exception { + configure(processor, 4); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + processor.setGenerateWriteFailure(true); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0); + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1); + runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0); + } + + @Test + public void onTriggerMultipleRecordsFailInMiddleRollbackOnFailure() throws Exception { + configure(processor, 3); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true"); + processor.setGenerateWriteFailure(true); + runner.enqueue(new byte[0]); + try { + runner.run(); + fail("ProcessException should be thrown, because any Hive Transaction is committed yet."); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0); + runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0); + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0); + // Assert incoming FlowFile stays in input queue. + assertEquals(1, runner.getQueueSize().getObjectCount()); + } + + private void assertOutputAvroRecords(List<Map<String, Object>> expectedRecords, MockFlowFile resultFlowFile) throws IOException { + assertEquals(String.valueOf(expectedRecords.size()), resultFlowFile.getAttribute(PutHive3Streaming.HIVE_STREAMING_RECORD_COUNT_ATTR)); + + final DataFileStream<GenericRecord> reader = new DataFileStream<>( + new ByteArrayInputStream(resultFlowFile.toByteArray()), + new GenericDatumReader<>()); + + Schema schema = reader.getSchema(); + + // Verify that the schema is preserved + assertEquals(schema, new Schema.Parser().parse(new File("src/test/resources/user.avsc"))); + + GenericRecord record = null; + for (Map<String, Object> expectedRecord : expectedRecords) { + assertTrue(reader.hasNext()); + record = reader.next(record); + final String name = record.get("name").toString(); + final Integer favorite_number = (Integer) record.get("favorite_number"); + assertNotNull(name); + assertNotNull(favorite_number); + assertNull(record.get("favorite_color")); + assertNull(record.get("scale")); + + assertEquals(expectedRecord.get("name"), name); + assertEquals(expectedRecord.get("favorite_number"), favorite_number); + } + assertFalse(reader.hasNext()); + } + + @Test + public void onTriggerWithConnectFailure() throws Exception { + configure(processor, 1); + processor.setGenerateConnectFailure(true); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.enqueue(new byte[0]); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0); + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0); + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0); + // Assert incoming FlowFile stays in input queue. + assertEquals(1, runner.getQueueSize().getObjectCount()); + } + + @Test + public void onTriggerWithConnectFailureRollbackOnFailure() throws Exception { + configure(processor, 1); + processor.setGenerateConnectFailure(true); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true"); + runner.enqueue(new byte[0]); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0); + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0); + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0); + // Assert incoming FlowFile stays in input queue. + assertEquals(1, runner.getQueueSize().getObjectCount()); + } + + @Test + public void onTriggerWithWriteFailure() throws Exception { + configure(processor, 2); + processor.setGenerateWriteFailure(true); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1); + final MockFlowFile flowFile = runner.getFlowFilesForRelationship(PutHive3Streaming.REL_FAILURE).get(0); + assertEquals("0", flowFile.getAttribute(HIVE_STREAMING_RECORD_COUNT_ATTR)); + assertEquals("default.users", flowFile.getAttribute(ATTR_OUTPUT_TABLES)); + } + + @Test + public void onTriggerWithWriteFailureRollbackOnFailure() throws Exception { + configure(processor, 2); + processor.setGenerateWriteFailure(true); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true"); + Map<String, Object> user1 = new HashMap<String, Object>() { + { + put("name", "Joe"); + put("favorite_number", 146); + } + }; + Map<String, Object> user2 = new HashMap<String, Object>() { + { + put("name", "Mary"); + put("favorite_number", 42); + } + }; + runner.enqueue(createAvroRecord(Arrays.asList(user1, user2))); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0); + // Assert incoming FlowFile stays in input queue. + assertEquals(1, runner.getQueueSize().getObjectCount()); + } + + @Test + public void onTriggerWithSerializationError() throws Exception { + configure(processor, 1); + processor.setGenerateSerializationError(true); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + Map<String, Object> user1 = new HashMap<String, Object>() { + { + put("name", "Joe"); + put("favorite_number", 146); + } + }; + runner.enqueue(createAvroRecord(Collections.singletonList(user1))); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0); + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 1); + } + + @Test + public void onTriggerWithSerializationErrorRollbackOnFailure() throws Exception { + configure(processor, 1); + processor.setGenerateSerializationError(true); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true"); + Map<String, Object> user1 = new HashMap<String, Object>() { + { + put("name", "Joe"); + put("favorite_number", 146); + } + }; + runner.enqueue(createAvroRecord(Collections.singletonList(user1))); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0); + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0); + // Assert incoming FlowFile stays in input queue. + assertEquals(1, runner.getQueueSize().getObjectCount()); + } + + @Test + public void onTriggerWithCommitFailure() throws Exception { + configure(processor, 1); + processor.setGenerateCommitFailure(true); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "false"); + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0); + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0); + runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 1); + } + + @Test + public void onTriggerWithCommitFailureRollbackOnFailure() throws Exception { + configure(processor, 1); + processor.setGenerateCommitFailure(true); + runner.setProperty(PutHive3Streaming.METASTORE_URI, "thrift://localhost:9083"); + runner.setProperty(PutHive3Streaming.DB_NAME, "default"); + runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); + runner.setProperty(PutHive3Streaming.ROLLBACK_ON_FAILURE, "true"); + runner.enqueue(new byte[0]); + try { + runner.run(); + fail("ProcessException should be thrown"); + } catch (AssertionError e) { + assertTrue(e.getCause() instanceof ProcessException); + } + + runner.assertTransferCount(PutHive3Streaming.REL_FAILURE, 0); + runner.assertTransferCount(PutHive3Streaming.REL_SUCCESS, 0); + runner.assertTransferCount(PutHive3Streaming.REL_RETRY, 0); + // Assert incoming FlowFile stays in input queue. + assertEquals(1, runner.getQueueSize().getObjectCount()); + } + + @Test + public void cleanup() { + processor.cleanup(); + } + + private byte[] createAvroRecord(List<Map<String, Object>> records) throws IOException { + final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + + List<GenericRecord> users = new LinkedList<>(); + for (Map<String, Object> record : records) { + final GenericRecord user = new GenericData.Record(schema); + user.put("name", record.get("name")); + user.put("favorite_number", record.get("favorite_number")); + user.put("favorite_color", record.get("favorite_color")); + users.add(user); + } + final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { + dataFileWriter.create(schema, out); + for (final GenericRecord user : users) { + dataFileWriter.append(user); + } + } + return out.toByteArray(); + + } + + private class MockPutHive3Streaming extends PutHive3Streaming { + + private boolean generateConnectFailure = false; + private boolean generateWriteFailure = false; + private boolean generateSerializationError = false; + private boolean generateCommitFailure = false; + private List<FieldSchema> schema = Arrays.asList( + new FieldSchema("name", serdeConstants.STRING_TYPE_NAME, ""), + new FieldSchema("favorite_number", serdeConstants.INT_TYPE_NAME, ""), + new FieldSchema("favorite_color", serdeConstants.STRING_TYPE_NAME, ""), + new FieldSchema("scale", serdeConstants.DOUBLE_TYPE_NAME, "") + ); + + @Override + StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException { + + if (generateConnectFailure) { + throw new StubConnectionError("Unit Test - Connection Error"); + } + + HiveRecordWriter hiveRecordWriter = new HiveRecordWriter(reader, getLogger()); + MockHiveStreamingConnection hiveConnection = new MockHiveStreamingConnection(options, reader, hiveRecordWriter, schema); + hiveConnection.setGenerateWriteFailure(generateWriteFailure); + hiveConnection.setGenerateSerializationError(generateSerializationError); + hiveConnection.setGenerateCommitFailure(generateCommitFailure); + return hiveConnection; + } + + void setGenerateConnectFailure(boolean generateConnectFailure) { + this.generateConnectFailure = generateConnectFailure; + } + + void setGenerateWriteFailure(boolean generateWriteFailure) { + this.generateWriteFailure = generateWriteFailure; + } + + void setGenerateSerializationError(boolean generateSerializationError) { + this.generateSerializationError = generateSerializationError; + } + + void setGenerateCommitFailure(boolean generateCommitFailure) { + this.generateCommitFailure = generateCommitFailure; + } + + void setFields(List<FieldSchema> schema) { + this.schema = schema; + } + } + + private class MockHiveStreamingConnection implements StreamingConnection { + + private boolean generateWriteFailure = false; + private boolean generateSerializationError = false; + private boolean generateCommitFailure = false; + private int writeAttemptCount = 0; + private ConnectionStats connectionStats; + private HiveOptions options; + private RecordWriter writer; + private HiveConf hiveConf; + private Table table; + private String metastoreURI; + + MockHiveStreamingConnection(HiveOptions options, RecordReader reader, RecordWriter recordWriter, List<FieldSchema> schema) { + this.options = options; + metastoreURI = options.getMetaStoreURI(); + this.writer = recordWriter; + this.hiveConf = this.options.getHiveConf(); + connectionStats = new ConnectionStats(); + this.table = new Table(Table.getEmptyTable(options.getDatabaseName(), options.getTableName())); + this.table.setFields(schema); + StorageDescriptor sd = this.table.getSd(); + sd.setOutputFormat(OrcOutputFormat.class.getName()); + sd.setLocation(TARGET_HIVE); + } + + @Override + public HiveConf getHiveConf() { + return hiveConf; + } + + @Override + public void beginTransaction() throws StreamingException { + writer.init(this, 0, 100); + } + + @Override + public synchronized void write(byte[] record) throws StreamingException { + throw new UnsupportedOperationException(this.getClass().getName() + " does not support writing of records via bytes, only via an InputStream"); + } + + @Override + public void write(InputStream inputStream) throws StreamingException { + try { + if (generateWriteFailure) { + throw new StubStreamingIOFailure("Unit Test - Streaming IO Failure"); + } + if (generateSerializationError) { + throw new StubSerializationError("Unit Test - Serialization error", new Exception()); + } + this.writer.write(writeAttemptCount, inputStream); + } finally { + writeAttemptCount++; + } + } + + @Override + public void commitTransaction() throws StreamingException { + if (generateCommitFailure) { + throw new StubTransactionError("Unit Test - Commit Failure"); + } + connectionStats.incrementCommittedTransactions(); + } + + @Override + public void abortTransaction() throws StreamingException { + connectionStats.incrementAbortedTransactions(); + } + + @Override + public void close() { + // closing the connection shouldn't throw an exception + } + + @Override + public ConnectionStats getConnectionStats() { + return connectionStats; + } + + public void setGenerateWriteFailure(boolean generateWriteFailure) { + this.generateWriteFailure = generateWriteFailure; + } + + public void setGenerateSerializationError(boolean generateSerializationError) { + this.generateSerializationError = generateSerializationError; + } + + public void setGenerateCommitFailure(boolean generateCommitFailure) { + this.generateCommitFailure = generateCommitFailure; + } + + @Override + public String getMetastoreUri() { + return metastoreURI; + } + + @Override + public Table getTable() { + return table; + } + + @Override + public List<String> getStaticPartitionValues() { + return null; + } + + @Override + public boolean isPartitionedTable() { + return false; + } + + @Override + public boolean isDynamicPartitioning() { + return false; + } + + @Override + public String getAgentInfo() { + return null; + } + + @Override + public PartitionInfo createPartitionIfNotExists(List<String> list) throws StreamingException { + return null; + } + } + + private static class MockKerberosCredentialsService implements KerberosCredentialsService, ControllerService { + + private String keytab = "src/test/resources/fake.keytab"; + private String principal = "t...@realm.com"; + + public MockKerberosCredentialsService() { + } + + public MockKerberosCredentialsService(String keytab, String principal) { + this.keytab = keytab; + this.principal = principal; + } + + @Override + public String getKeytab() { + return keytab; + } + + @Override + public String getPrincipal() { + return principal; + } + + @Override + public void initialize(ControllerServiceInitializationContext context) throws InitializationException { + + } + + @Override + public Collection<ValidationResult> validate(ValidationContext context) { + return Collections.EMPTY_LIST; + } + + @Override + public PropertyDescriptor getPropertyDescriptor(String name) { + return null; + } + + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + + } + + @Override + public List<PropertyDescriptor> getPropertyDescriptors() { + return null; + } + + @Override + public String getIdentifier() { + return "kcs"; + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java new file mode 100644 index 0000000..50e83ac --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestSelectHive3QL.java @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hive; + +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.dbcp.DBCPService; +import org.apache.nifi.dbcp.hive.Hive3DBCPService; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.hive.HiveJdbcCommon; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.apache.nifi.processors.hive.SelectHive3QL.HIVEQL_OUTPUT_FORMAT; +import static org.apache.nifi.util.hive.HiveJdbcCommon.AVRO; +import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV; +import static org.apache.nifi.util.hive.HiveJdbcCommon.CSV_MIME_TYPE; +import static org.apache.nifi.util.hive.HiveJdbcCommon.MIME_TYPE_AVRO_BINARY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestSelectHive3QL { + + private static final Logger LOGGER; + private final static String MAX_ROWS_KEY = "maxRows"; + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); + System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hive.SelectHive3QL", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.hive.TestSelectHive3QL", "debug"); + LOGGER = LoggerFactory.getLogger(TestSelectHive3QL.class); + } + + private final static String DB_LOCATION = "target/db"; + + private final static String QUERY_WITH_EL = "select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + " from persons PER" + + " where PER.ID > ${person.id}"; + + private final static String QUERY_WITHOUT_EL = "select " + + " PER.ID as PersonId, PER.NAME as PersonName, PER.CODE as PersonCode" + + " from persons PER" + + " where PER.ID > 10"; + + + @BeforeClass + public static void setupClass() { + System.setProperty("derby.stream.error.file", "target/derby.log"); + } + + private TestRunner runner; + + @Before + public void setup() throws InitializationException { + final DBCPService dbcp = new DBCPServiceSimpleImpl(); + final Map<String, String> dbcpProperties = new HashMap<>(); + + runner = TestRunners.newTestRunner(SelectHive3QL.class); + runner.addControllerService("dbcp", dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(SelectHive3QL.HIVE_DBCP_SERVICE, "dbcp"); + } + + @Test + public void testIncomingConnectionWithNoFlowFile() throws InitializationException { + runner.setIncomingConnection(true); + runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM persons"); + runner.run(); + runner.assertTransferCount(SelectHive3QL.REL_SUCCESS, 0); + runner.assertTransferCount(SelectHive3QL.REL_FAILURE, 0); + } + + @Test + public void testNoIncomingConnection() throws ClassNotFoundException, SQLException, InitializationException, IOException { + runner.setIncomingConnection(false); + invokeOnTrigger(QUERY_WITHOUT_EL, false, "Avro"); + } + + @Test + public void testNoTimeLimit() throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(QUERY_WITH_EL, true, "Avro"); + } + + + @Test + public void testWithNullIntColumn() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((Hive3DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NULL_INT"); + } catch (final SQLException sqle) { + // Nothing to do, probably means the table didn't exist + } + + stmt.execute("create table TEST_NULL_INT (id integer not null, val1 integer, val2 integer, constraint my_pk primary key (id))"); + + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (0, NULL, 1)"); + stmt.execute("insert into TEST_NULL_INT (id, val1, val2) VALUES (1, 1, 1)"); + + runner.setIncomingConnection(false); + runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_NULL_INT"); + runner.run(); + + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(0).assertAttributeEquals(SelectHive3QL.RESULT_ROW_COUNT, "2"); + } + + @Test + public void testWithSqlException() throws SQLException { + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((Hive3DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_NO_ROWS"); + } catch (final SQLException sqle) { + // Nothing to do, probably means the table didn't exist + } + + stmt.execute("create table TEST_NO_ROWS (id integer)"); + + runner.setIncomingConnection(false); + // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.) + runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT val1 FROM TEST_NO_ROWS"); + runner.run(); + + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1); + } + + @Test + public void testWithBadSQL() throws SQLException { + final String BAD_SQL = "create table TEST_NO_ROWS (id integer)"; + + // Test with incoming flow file (it should be routed to failure intact, i.e. same content and no parent) + runner.setIncomingConnection(true); + // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.) + runner.enqueue(BAD_SQL); + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(SelectHive3QL.REL_FAILURE).get(0); + flowFile.assertContentEquals(BAD_SQL); + flowFile.assertAttributeEquals("parentIds", null); + runner.clearTransferState(); + + // Test with no incoming flow file (an empty flow file is transferred) + runner.setIncomingConnection(false); + // Try a valid SQL statement that will generate an error (val1 does not exist, e.g.) + runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, BAD_SQL); + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_FAILURE, 1); + flowFile = runner.getFlowFilesForRelationship(SelectHive3QL.REL_FAILURE).get(0); + flowFile.assertContentEquals(""); + } + + @Test + public void invokeOnTriggerWithCsv() + throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(QUERY_WITHOUT_EL, false, CSV); + } + + @Test + public void invokeOnTriggerWithAvro() + throws InitializationException, ClassNotFoundException, SQLException, IOException { + invokeOnTrigger(QUERY_WITHOUT_EL, false, AVRO); + } + + public void invokeOnTrigger(final String query, final boolean incomingFlowFile, String outputFormat) + throws InitializationException, ClassNotFoundException, SQLException, IOException { + + // remove previous test database, if any + final File dbLocation = new File(DB_LOCATION); + dbLocation.delete(); + + // load test data to database + final Connection con = ((Hive3DBCPService) runner.getControllerService("dbcp")).getConnection(); + final Statement stmt = con.createStatement(); + try { + stmt.execute("drop table persons"); + } catch (final SQLException sqle) { + // Nothing to do here, the table didn't exist + } + + stmt.execute("create table persons (id integer, name varchar(100), code integer)"); + Random rng = new Random(53496); + final int nrOfRows = 100; + stmt.executeUpdate("insert into persons values (1, 'Joe Smith', " + rng.nextInt(469947) + ")"); + for (int i = 2; i < nrOfRows; i++) { + stmt.executeUpdate("insert into persons values (" + i + ", 'Someone Else', " + rng.nextInt(469947) + ")"); + } + stmt.executeUpdate("insert into persons values (" + nrOfRows + ", 'Last Person', NULL)"); + + LOGGER.info("test data loaded"); + + runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, query); + runner.setProperty(HIVEQL_OUTPUT_FORMAT, outputFormat); + + if (incomingFlowFile) { + // incoming FlowFile content is not used, but attributes are used + final Map<String, String> attributes = new HashMap<>(); + attributes.put("person.id", "10"); + runner.enqueue("Hello".getBytes(), attributes); + } + + runner.setIncomingConnection(incomingFlowFile); + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1); + + final List<MockFlowFile> flowfiles = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS); + MockFlowFile flowFile = flowfiles.get(0); + final InputStream in = new ByteArrayInputStream(flowFile.toByteArray()); + long recordsFromStream = 0; + if (AVRO.equals(outputFormat)) { + assertEquals(MIME_TYPE_AVRO_BINARY, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) { + GenericRecord record = null; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + recordsFromStream++; + } + } + } else { + assertEquals(CSV_MIME_TYPE, flowFile.getAttribute(CoreAttributes.MIME_TYPE.key())); + BufferedReader br = new BufferedReader(new InputStreamReader(in)); + + String headerRow = br.readLine(); + // Derby capitalizes column names + assertEquals("PERSONID,PERSONNAME,PERSONCODE", headerRow); + + // Validate rows + String line; + while ((line = br.readLine()) != null) { + recordsFromStream++; + String[] values = line.split(","); + if (recordsFromStream < (nrOfRows - 10)) { + assertEquals(3, values.length); + assertTrue(values[1].startsWith("\"")); + assertTrue(values[1].endsWith("\"")); + } else { + assertEquals(2, values.length); // Middle value is null + } + } + } + assertEquals(nrOfRows - 10, recordsFromStream); + assertEquals(recordsFromStream, Integer.parseInt(flowFile.getAttribute(SelectHive3QL.RESULT_ROW_COUNT))); + flowFile.assertAttributeEquals(AbstractHive3QLProcessor.ATTR_INPUT_TABLES, "persons"); + } + + @Test + public void testMaxRowsPerFlowFileAvro() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + InputStream in; + MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount = 0; + //create larger row set + for (int batch = 0; batch < 100; batch++) { + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setIncomingConnection(false); + runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE"); + runner.setProperty(SelectHive3QL.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}"); + runner.setProperty(SelectHive3QL.HIVEQL_OUTPUT_FORMAT, HiveJdbcCommon.AVRO); + runner.setVariable(MAX_ROWS_KEY, "9"); + + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 12); + + //ensure all but the last file have 9 records each + for (int ff = 0; ff < 11; ff++) { + mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(ff); + in = new ByteArrayInputStream(mff.toByteArray()); + assertEquals(9, getNumberOfRecordsFromStream(in)); + + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); + } + + //last file should have 1 record + mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(11); + in = new ByteArrayInputStream(mff.toByteArray()); + assertEquals(1, getNumberOfRecordsFromStream(in)); + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(11), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); + runner.clearTransferState(); + } + + @Test + public void testParametrizedQuery() throws ClassNotFoundException, SQLException, InitializationException, IOException { + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount = 0; + //create larger row set + for (int batch = 0; batch < 100; batch++) { + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setIncomingConnection(true); + runner.setProperty(SelectHive3QL.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}"); + runner.setProperty(SelectHive3QL.HIVEQL_OUTPUT_FORMAT, HiveJdbcCommon.AVRO); + runner.setVariable(MAX_ROWS_KEY, "9"); + + Map<String, String> attributes = new HashMap<String, String>(); + attributes.put("hiveql.args.1.value", "1"); + attributes.put("hiveql.args.1.type", String.valueOf(Types.INTEGER)); + runner.enqueue("SELECT * FROM TEST_QUERY_DB_TABLE WHERE id = ?", attributes ); + + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 1); + runner.clearTransferState(); + } + + @Test + public void testMaxRowsPerFlowFileCSV() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + InputStream in; + MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount = 0; + //create larger row set + for (int batch = 0; batch < 100; batch++) { + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setIncomingConnection(true); + runner.setProperty(SelectHive3QL.MAX_ROWS_PER_FLOW_FILE, "${" + MAX_ROWS_KEY + "}"); + runner.setProperty(SelectHive3QL.HIVEQL_OUTPUT_FORMAT, HiveJdbcCommon.CSV); + + runner.enqueue("SELECT * FROM TEST_QUERY_DB_TABLE", new HashMap<String, String>() {{ + put(MAX_ROWS_KEY, "9"); + }}); + + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, 12); + + //ensure all but the last file have 9 records (10 lines = 9 records + header) each + for (int ff = 0; ff < 11; ff++) { + mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(ff); + in = new ByteArrayInputStream(mff.toByteArray()); + BufferedReader br = new BufferedReader(new InputStreamReader(in)); + assertEquals(10, br.lines().count()); + + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(ff), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); + } + + //last file should have 1 record (2 lines = 1 record + header) + mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(11); + in = new ByteArrayInputStream(mff.toByteArray()); + BufferedReader br = new BufferedReader(new InputStreamReader(in)); + assertEquals(2, br.lines().count()); + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(11), mff.getAttribute("fragment.index")); + assertEquals("12", mff.getAttribute("fragment.count")); + runner.clearTransferState(); + } + + @Test + public void testMaxRowsPerFlowFileWithMaxFragments() throws ClassNotFoundException, SQLException, InitializationException, IOException { + + // load test data to database + final Connection con = ((DBCPService) runner.getControllerService("dbcp")).getConnection(); + Statement stmt = con.createStatement(); + InputStream in; + MockFlowFile mff; + + try { + stmt.execute("drop table TEST_QUERY_DB_TABLE"); + } catch (final SQLException sqle) { + // Ignore this error, probably a "table does not exist" since Derby doesn't yet support DROP IF EXISTS [DERBY-4842] + } + + stmt.execute("create table TEST_QUERY_DB_TABLE (id integer not null, name varchar(100), scale float, created_on timestamp, bignum bigint default 0)"); + int rowCount = 0; + //create larger row set + for (int batch = 0; batch < 100; batch++) { + stmt.execute("insert into TEST_QUERY_DB_TABLE (id, name, scale, created_on) VALUES (" + rowCount + ", 'Joe Smith', 1.0, '1962-09-23 03:23:34.234')"); + rowCount++; + } + + runner.setIncomingConnection(false); + runner.setProperty(SelectHive3QL.HIVEQL_SELECT_QUERY, "SELECT * FROM TEST_QUERY_DB_TABLE"); + runner.setProperty(SelectHive3QL.MAX_ROWS_PER_FLOW_FILE, "9"); + Integer maxFragments = 3; + runner.setProperty(SelectHive3QL.MAX_FRAGMENTS, maxFragments.toString()); + + runner.run(); + runner.assertAllFlowFilesTransferred(SelectHive3QL.REL_SUCCESS, maxFragments); + + for (int i = 0; i < maxFragments; i++) { + mff = runner.getFlowFilesForRelationship(SelectHive3QL.REL_SUCCESS).get(i); + in = new ByteArrayInputStream(mff.toByteArray()); + assertEquals(9, getNumberOfRecordsFromStream(in)); + + mff.assertAttributeExists("fragment.identifier"); + assertEquals(Integer.toString(i), mff.getAttribute("fragment.index")); + assertEquals(maxFragments.toString(), mff.getAttribute("fragment.count")); + } + + runner.clearTransferState(); + } + + private long getNumberOfRecordsFromStream(InputStream in) throws IOException { + final DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + try (DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, datumReader)) { + GenericRecord record = null; + long recordsFromStream = 0; + while (dataFileReader.hasNext()) { + // Reuse record object by passing it to next(). This saves us from + // allocating and garbage collecting many objects for files with + // many items. + record = dataFileReader.next(record); + recordsFromStream += 1; + } + + return recordsFromStream; + } + } + + /** + * Simple implementation only for SelectHive3QL processor testing. + */ + private class DBCPServiceSimpleImpl extends AbstractControllerService implements Hive3DBCPService { + + @Override + public String getIdentifier() { + return "dbcp"; + } + + @Override + public Connection getConnection() throws ProcessException { + try { + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + return DriverManager.getConnection("jdbc:derby:" + DB_LOCATION + ";create=true"); + } catch (final Exception e) { + throw new ProcessException("getConnection failed: " + e); + } + } + + @Override + public String getConnectionURL() { + return "jdbc:derby:" + DB_LOCATION + ";create=true"; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/da99f873/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java new file mode 100644 index 0000000..e1af5a1 --- /dev/null +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/orc/PutORCTest.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.orc; + +import org.apache.avro.Schema; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.io.orc.RecordReader; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.IntWritable; +import org.apache.log4j.BasicConfigurator; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.hadoop.exception.FailureException; +import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.temporal.ChronoField; +import java.util.Calendar; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +public class PutORCTest { + + private static final String DIRECTORY = "target"; + private static final String TEST_CONF_PATH = "src/test/resources/core-site.xml"; + + private Schema schema; + private Configuration testConf; + private PutORC proc; + private TestRunner testRunner; + + @BeforeClass + public static void setupLogging() { + BasicConfigurator.configure(); + } + + @Before + public void setup() throws IOException { + final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user.avsc"), StandardCharsets.UTF_8); + schema = new Schema.Parser().parse(avroSchema); + + testConf = new Configuration(); + testConf.addResource(new Path(TEST_CONF_PATH)); + + proc = new PutORC(); + } + + private void configure(final PutORC putORC, final int numUsers) throws InitializationException { + configure(putORC, numUsers, null); + } + + private void configure(final PutORC putORC, final int numUsers, final BiFunction<Integer, MockRecordParser, Void> recordGenerator) throws InitializationException { + testRunner = TestRunners.newTestRunner(putORC); + testRunner.setProperty(PutORC.HADOOP_CONFIGURATION_RESOURCES, TEST_CONF_PATH); + testRunner.setProperty(PutORC.DIRECTORY, DIRECTORY); + + MockRecordParser readerFactory = new MockRecordParser(); + + final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); + for (final RecordField recordField : recordSchema.getFields()) { + readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType(), recordField.isNullable()); + } + + if (recordGenerator == null) { + for (int i = 0; i < numUsers; i++) { + readerFactory.addRecord("name" + i, i, "blue" + i, i * 10.0); + } + } else { + recordGenerator.apply(numUsers, readerFactory); + } + + testRunner.addControllerService("mock-reader-factory", readerFactory); + testRunner.enableControllerService(readerFactory); + + testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory"); + } + + @Test + public void testWriteORCWithDefaults() throws IOException, InitializationException { + configure(proc, 100); + + final String filename = "testORCWithDefaults-" + System.currentTimeMillis(); + + final Map<String, String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.setProperty(PutORC.HIVE_TABLE_NAME, "myTable"); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1); + + final Path orcFile = new Path(DIRECTORY + "/" + filename); + + // verify the successful flow file has the expected attributes + final MockFlowFile mockFlowFile = testRunner.getFlowFilesForRelationship(PutORC.REL_SUCCESS).get(0); + mockFlowFile.assertAttributeEquals(PutORC.ABSOLUTE_HDFS_PATH_ATTRIBUTE, orcFile.getParent().toString()); + mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename); + mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "100"); + mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE, + "CREATE EXTERNAL TABLE IF NOT EXISTS myTable (name STRING, favorite_number INT, favorite_color STRING, scale DOUBLE) STORED AS ORC"); + + // verify we generated a provenance event + final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents(); + assertEquals(1, provEvents.size()); + + // verify it was a SEND event with the correct URI + final ProvenanceEventRecord provEvent = provEvents.get(0); + assertEquals(ProvenanceEventType.SEND, provEvent.getEventType()); + // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename. + Assert.assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + filename)); + + // verify the content of the ORC file by reading it back in + verifyORCUsers(orcFile, 100); + + // verify we don't have the temp dot file after success + final File tempOrcFile = new File(DIRECTORY + "/." + filename); + Assert.assertFalse(tempOrcFile.exists()); + + // verify we DO have the CRC file after success + final File crcAvroORCFile = new File(DIRECTORY + "/." + filename + ".crc"); + Assert.assertTrue(crcAvroORCFile.exists()); + } + + @Test + public void testWriteORCWithAvroLogicalTypes() throws IOException, InitializationException { + final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user_logical_types.avsc"), StandardCharsets.UTF_8); + schema = new Schema.Parser().parse(avroSchema); + Calendar now = Calendar.getInstance(); + LocalTime nowTime = LocalTime.now(); + LocalDateTime nowDateTime = LocalDateTime.now(); + LocalDate epoch = LocalDate.ofEpochDay(0); + LocalDate nowDate = LocalDate.now(); + + final int timeMillis = nowTime.get(ChronoField.MILLI_OF_DAY); + final Timestamp timestampMillis = Timestamp.valueOf(nowDateTime); + final Date dt = Date.valueOf(nowDate); + final double dec = 1234.56; + + configure(proc, 10, (numUsers, readerFactory) -> { + for (int i = 0; i < numUsers; i++) { + readerFactory.addRecord( + i, + timeMillis, + timestampMillis, + dt, + dec); + } + return null; + }); + + final String filename = "testORCWithDefaults-" + System.currentTimeMillis(); + + final Map<String, String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.setProperty(PutORC.HIVE_TABLE_NAME, "myTable"); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1); + + final Path orcFile = new Path(DIRECTORY + "/" + filename); + + // verify the successful flow file has the expected attributes + final MockFlowFile mockFlowFile = testRunner.getFlowFilesForRelationship(PutORC.REL_SUCCESS).get(0); + mockFlowFile.assertAttributeEquals(PutORC.ABSOLUTE_HDFS_PATH_ATTRIBUTE, orcFile.getParent().toString()); + mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename); + mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "10"); + // DDL will be created with field names normalized (lowercased, e.g.) for Hive by default + mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE, + "CREATE EXTERNAL TABLE IF NOT EXISTS myTable (id INT, timemillis INT, timestampmillis TIMESTAMP, dt DATE, dec DOUBLE) STORED AS ORC"); + + // verify we generated a provenance event + final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents(); + assertEquals(1, provEvents.size()); + + // verify it was a SEND event with the correct URI + final ProvenanceEventRecord provEvent = provEvents.get(0); + assertEquals(ProvenanceEventType.SEND, provEvent.getEventType()); + // If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename. + Assert.assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + filename)); + + // verify the content of the ORC file by reading it back in + verifyORCUsers(orcFile, 10, (x, currUser) -> { + assertEquals((int) currUser, ((IntWritable) x.get(0)).get()); + assertEquals(timeMillis, ((IntWritable) x.get(1)).get()); + assertEquals(timestampMillis, ((TimestampWritable) x.get(2)).getTimestamp()); + assertEquals(dt, ((DateWritable) x.get(3)).get()); + assertEquals(dec, ((DoubleWritable) x.get(4)).get(), Double.MIN_VALUE); + return null; + } + ); + + // verify we don't have the temp dot file after success + final File tempOrcFile = new File(DIRECTORY + "/." + filename); + Assert.assertFalse(tempOrcFile.exists()); + + // verify we DO have the CRC file after success + final File crcAvroORCFile = new File(DIRECTORY + "/." + filename + ".crc"); + Assert.assertTrue(crcAvroORCFile.exists()); + } + + @Test + public void testValidSchemaWithELShouldBeSuccessful() throws InitializationException { + configure(proc, 10); + + final String filename = "testValidSchemaWithELShouldBeSuccessful-" + System.currentTimeMillis(); + + // don't provide my.schema as an attribute + final Map<String, String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + flowFileAttributes.put("my.schema", schema.toString()); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1); + } + + @Test + public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException { + configure(proc, 10); + + final org.apache.nifi.serialization.RecordReader recordReader = Mockito.mock(org.apache.nifi.serialization.RecordReader.class); + when(recordReader.nextRecord()).thenThrow(new MalformedRecordException("ERROR")); + + final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class); + when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); + when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader); + + testRunner.addControllerService("mock-reader-factory", readerFactory); + testRunner.enableControllerService(readerFactory); + testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory"); + + final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis(); + + final Map<String, String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutORC.REL_FAILURE, 1); + } + + @Test + public void testIOExceptionCreatingWriterShouldRouteToRetry() throws InitializationException { + final PutORC proc = new PutORC() { + @Override + public HDFSRecordWriter createHDFSRecordWriter(ProcessContext context, FlowFile flowFile, Configuration conf, Path path, RecordSchema schema) + throws IOException { + throw new IOException("IOException"); + } + }; + configure(proc, 0); + + final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis(); + + final Map<String, String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutORC.REL_RETRY, 1); + } + + @Test + public void testIOExceptionFromReaderShouldRouteToRetry() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException { + configure(proc, 10); + + final RecordSet recordSet = Mockito.mock(RecordSet.class); + when(recordSet.next()).thenThrow(new IOException("ERROR")); + + final org.apache.nifi.serialization.RecordReader recordReader = Mockito.mock(org.apache.nifi.serialization.RecordReader.class); + when(recordReader.createRecordSet()).thenReturn(recordSet); + when(recordReader.getSchema()).thenReturn(AvroTypeUtil.createSchema(schema)); + + final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class); + when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory"); + when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader); + + testRunner.addControllerService("mock-reader-factory", readerFactory); + testRunner.enableControllerService(readerFactory); + testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory"); + + final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis(); + + final Map<String, String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutORC.REL_RETRY, 1); + } + + @Test + public void testIOExceptionRenamingShouldRouteToRetry() throws InitializationException { + final PutORC proc = new PutORC() { + @Override + protected void rename(FileSystem fileSystem, Path srcFile, Path destFile) + throws IOException, InterruptedException, FailureException { + throw new IOException("IOException renaming"); + } + }; + + configure(proc, 10); + + final String filename = "testIOExceptionRenamingShouldRouteToRetry-" + System.currentTimeMillis(); + + final Map<String, String> flowFileAttributes = new HashMap<>(); + flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); + + testRunner.enqueue("trigger", flowFileAttributes); + testRunner.run(); + testRunner.assertAllFlowFilesTransferred(PutORC.REL_RETRY, 1); + + // verify we don't have the temp dot file after success + final File tempAvroORCFile = new File(DIRECTORY + "/." + filename); + Assert.assertFalse(tempAvroORCFile.exists()); + } + + private void verifyORCUsers(final Path orcUsers, final int numExpectedUsers) throws IOException { + verifyORCUsers(orcUsers, numExpectedUsers, null); + } + + private void verifyORCUsers(final Path orcUsers, final int numExpectedUsers, BiFunction<List<Object>, Integer, Void> assertFunction) throws IOException { + Reader reader = OrcFile.createReader(orcUsers, OrcFile.readerOptions(testConf)); + RecordReader recordReader = reader.rows(); + + TypeInfo typeInfo = + TypeInfoUtils.getTypeInfoFromTypeString("struct<name:string,favorite_number:int,favorite_color:string,scale:double>"); + StructObjectInspector inspector = (StructObjectInspector) + OrcStruct.createObjectInspector(typeInfo); + + int currUser = 0; + Object nextRecord = null; + while ((nextRecord = recordReader.next(nextRecord)) != null) { + Assert.assertNotNull(nextRecord); + Assert.assertTrue("Not an OrcStruct", nextRecord instanceof OrcStruct); + List<Object> x = inspector.getStructFieldsDataAsList(nextRecord); + + if (assertFunction == null) { + assertEquals("name" + currUser, x.get(0).toString()); + assertEquals(currUser, ((IntWritable) x.get(1)).get()); + assertEquals("blue" + currUser, x.get(2).toString()); + assertEquals(10.0 * currUser, ((DoubleWritable) x.get(3)).get(), Double.MIN_VALUE); + } else { + assertFunction.apply(x, currUser); + } + currUser++; + } + + assertEquals(numExpectedUsers, currUser); + } + +} \ No newline at end of file