Repository: incubator-nifi
Updated Branches:
  refs/heads/develop e93c82919 -> b2a1f5217


NIFI-271


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b2a1f521
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b2a1f521
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b2a1f521

Branch: refs/heads/develop
Commit: b2a1f5217dd89e8133887d691e56483e9f0e4214
Parents: e93c829
Author: joewitt <[email protected]>
Authored: Sat Apr 25 09:15:36 2015 -0400
Committer: joewitt <[email protected]>
Committed: Sat Apr 25 09:15:36 2015 -0400

----------------------------------------------------------------------
 .../processors/kite/AbstractKiteProcessor.java  |  10 +-
 .../apache/nifi/processors/kite/AvroUtil.java   |  18 +-
 .../nifi/processors/kite/ConvertCSVToAvro.java  | 348 +++++++++----------
 .../nifi/processors/kite/ConvertJSONToAvro.java |   2 +-
 .../processors/kite/StoreInKiteDataset.java     | 208 +++++------
 .../processors/kite/TestCSVToAvroProcessor.java | 181 +++++-----
 .../kite/TestConfigurationProperty.java         |  65 ++--
 .../nifi/processors/kite/TestGetSchema.java     | 117 ++++---
 .../kite/TestJSONToAvroProcessor.java           |  58 ++--
 .../kite/TestKiteProcessorsCluster.java         | 125 ++++---
 .../kite/TestKiteStorageProcessor.java          | 225 ++++++------
 .../apache/nifi/processors/kite/TestUtil.java   | 114 +++---
 12 files changed, 732 insertions(+), 739 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
index 56418f4..fec8239 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteProcessor.java
@@ -81,7 +81,7 @@ abstract class AbstractKiteProcessor extends 
AbstractProcessor {
         public ValidationResult validate(String subject, String uri, 
ValidationContext context) {
             String message = "not set";
             boolean isValid = true;
-            
+
             if (uri.trim().isEmpty()) {
                 isValid = false;
             } else {
@@ -95,7 +95,7 @@ abstract class AbstractKiteProcessor extends 
AbstractProcessor {
                     }
                 }
             }
-            
+
             return new ValidationResult.Builder()
                     .subject(subject)
                     .input(uri)
@@ -163,14 +163,14 @@ abstract class AbstractKiteProcessor extends 
AbstractProcessor {
         public ValidationResult validate(String subject, String uri, 
ValidationContext context) {
             Configuration conf = 
getConfiguration(context.getProperty(CONF_XML_FILES).getValue());
             String error = null;
-            
+
             final boolean elPresent = 
context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(uri);
             if (!elPresent) {
                 try {
                     getSchema(uri, conf);
-                  } catch (SchemaNotFoundException e) {
+                } catch (SchemaNotFoundException e) {
                     error = e.getMessage();
-                  }
+                }
             }
             return new ValidationResult.Builder()
                     .subject(subject)

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
index 9ff0f73..53075c7 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroUtil.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.nifi.processors.kite;
 
 import org.apache.avro.Schema;
@@ -24,17 +23,16 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 
-
 class AvroUtil {
 
-  @SuppressWarnings("unchecked")
-  public static <D> DatumWriter<D> newDatumWriter(Schema schema, Class<D> 
dClass) {
-    return (DatumWriter<D>) GenericData.get().createDatumWriter(schema);
-  }
+    @SuppressWarnings("unchecked")
+    public static <D> DatumWriter<D> newDatumWriter(Schema schema, Class<D> 
dClass) {
+        return (DatumWriter<D>) GenericData.get().createDatumWriter(schema);
+    }
 
-  @SuppressWarnings("unchecked")
-  public static <D> DatumReader<D> newDatumReader(Schema schema, Class<D> 
dClass) {
-    return (DatumReader<D>) GenericData.get().createDatumReader(schema);
-  }
+    @SuppressWarnings("unchecked")
+    public static <D> DatumReader<D> newDatumReader(Schema schema, Class<D> 
dClass) {
+        return (DatumReader<D>) GenericData.get().createDatumReader(schema);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index c6f58c7..564a203 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.nifi.processors.kite;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -57,204 +56,205 @@ import static 
org.apache.nifi.processor.util.StandardValidators.createLongValida
 
 @Tags({"kite", "csv", "avro"})
 @CapabilityDescription(
-    "Converts CSV files to Avro according to an Avro Schema")
+        "Converts CSV files to Avro according to an Avro Schema")
 public class ConvertCSVToAvro extends AbstractKiteProcessor {
-  private static final CSVProperties DEFAULTS = new 
CSVProperties.Builder().build();
 
-  private static final Validator CHAR_VALIDATOR = new Validator() {
-    @Override
-    public ValidationResult validate(String subject, String input,
-                                     ValidationContext context) {
-      return new ValidationResult.Builder()
-          .subject(subject)
-          .input(input)
-          .explanation("Only single characters are supported")
-          .valid(input.length() == 1)
-          .build();
-    }
-  };
+    private static final CSVProperties DEFAULTS = new 
CSVProperties.Builder().build();
 
-  private static final Relationship SUCCESS = new Relationship.Builder()
-      .name("success")
-      .description("FlowFile content has been successfully saved")
-      .build();
+    private static final Validator CHAR_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(String subject, String input,
+                ValidationContext context) {
+            return new ValidationResult.Builder()
+                    .subject(subject)
+                    .input(input)
+                    .explanation("Only single characters are supported")
+                    .valid(input.length() == 1)
+                    .build();
+        }
+    };
 
-  private static final Relationship FAILURE = new Relationship.Builder()
-      .name("failure")
-      .description("FlowFile content could not be processed")
-      .build();
+    private static final Relationship SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFile content has been successfully saved")
+            .build();
 
-  @VisibleForTesting
-  static final PropertyDescriptor SCHEMA =
-      new PropertyDescriptor.Builder()
-          .name("Record schema")
-          .description("Outgoing Avro schema for each record created from a 
CSV row")
-          .addValidator(SCHEMA_VALIDATOR)
-          .expressionLanguageSupported(true)
-          .required(true)
-          .build();
+    private static final Relationship FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFile content could not be processed")
+            .build();
 
-  @VisibleForTesting
-  static final PropertyDescriptor CHARSET =
-      new PropertyDescriptor.Builder()
-          .name("CSV charset")
-          .description("Character set for CSV files")
-          .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
-          .defaultValue(DEFAULTS.charset)
-          .build();
+    @VisibleForTesting
+    static final PropertyDescriptor SCHEMA
+            = new PropertyDescriptor.Builder()
+            .name("Record schema")
+            .description("Outgoing Avro schema for each record created from a 
CSV row")
+            .addValidator(SCHEMA_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
 
-  @VisibleForTesting
-  static final PropertyDescriptor DELIMITER =
-      new PropertyDescriptor.Builder()
-          .name("CSV delimiter")
-          .description("Delimiter character for CSV records")
-          .addValidator(CHAR_VALIDATOR)
-          .defaultValue(DEFAULTS.delimiter)
-          .build();
+    @VisibleForTesting
+    static final PropertyDescriptor CHARSET
+            = new PropertyDescriptor.Builder()
+            .name("CSV charset")
+            .description("Character set for CSV files")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .defaultValue(DEFAULTS.charset)
+            .build();
 
-  @VisibleForTesting
-  static final PropertyDescriptor QUOTE =
-      new PropertyDescriptor.Builder()
-          .name("CSV quote character")
-          .description("Quote character for CSV values")
-          .addValidator(CHAR_VALIDATOR)
-          .defaultValue(DEFAULTS.quote)
-          .build();
+    @VisibleForTesting
+    static final PropertyDescriptor DELIMITER
+            = new PropertyDescriptor.Builder()
+            .name("CSV delimiter")
+            .description("Delimiter character for CSV records")
+            .addValidator(CHAR_VALIDATOR)
+            .defaultValue(DEFAULTS.delimiter)
+            .build();
 
-  @VisibleForTesting
-  static final PropertyDescriptor ESCAPE =
-      new PropertyDescriptor.Builder()
-          .name("CSV escape character")
-          .description("Escape character for CSV values")
-          .addValidator(CHAR_VALIDATOR)
-          .defaultValue(DEFAULTS.escape)
-          .build();
+    @VisibleForTesting
+    static final PropertyDescriptor QUOTE
+            = new PropertyDescriptor.Builder()
+            .name("CSV quote character")
+            .description("Quote character for CSV values")
+            .addValidator(CHAR_VALIDATOR)
+            .defaultValue(DEFAULTS.quote)
+            .build();
 
-  @VisibleForTesting
-  static final PropertyDescriptor HAS_HEADER =
-      new PropertyDescriptor.Builder()
-          .name("Use CSV header line")
-          .description("Whether to use the first line as a header")
-          .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-          .defaultValue(String.valueOf(DEFAULTS.useHeader))
-          .build();
+    @VisibleForTesting
+    static final PropertyDescriptor ESCAPE
+            = new PropertyDescriptor.Builder()
+            .name("CSV escape character")
+            .description("Escape character for CSV values")
+            .addValidator(CHAR_VALIDATOR)
+            .defaultValue(DEFAULTS.escape)
+            .build();
 
-  @VisibleForTesting
-  static final PropertyDescriptor LINES_TO_SKIP =
-      new PropertyDescriptor.Builder()
-          .name("Lines to skip")
-          .description("Number of lines to skip before reading header or data")
-          .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
-          .defaultValue(String.valueOf(DEFAULTS.linesToSkip))
-          .build();
+    @VisibleForTesting
+    static final PropertyDescriptor HAS_HEADER
+            = new PropertyDescriptor.Builder()
+            .name("Use CSV header line")
+            .description("Whether to use the first line as a header")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .defaultValue(String.valueOf(DEFAULTS.useHeader))
+            .build();
 
-  private static final List<PropertyDescriptor> PROPERTIES =
-      ImmutableList.<PropertyDescriptor>builder()
-          .addAll(AbstractKiteProcessor.getProperties())
-          .add(SCHEMA)
-          .add(CHARSET)
-          .add(DELIMITER)
-          .add(QUOTE)
-          .add(ESCAPE)
-          .add(HAS_HEADER)
-          .add(LINES_TO_SKIP)
-          .build();
+    @VisibleForTesting
+    static final PropertyDescriptor LINES_TO_SKIP
+            = new PropertyDescriptor.Builder()
+            .name("Lines to skip")
+            .description("Number of lines to skip before reading header or 
data")
+            .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
+            .defaultValue(String.valueOf(DEFAULTS.linesToSkip))
+            .build();
 
-  private static final Set<Relationship> RELATIONSHIPS =
-      ImmutableSet.<Relationship>builder()
-          .add(SUCCESS)
-          .add(FAILURE)
-          .build();
+    private static final List<PropertyDescriptor> PROPERTIES
+            = ImmutableList.<PropertyDescriptor>builder()
+            .addAll(AbstractKiteProcessor.getProperties())
+            .add(SCHEMA)
+            .add(CHARSET)
+            .add(DELIMITER)
+            .add(QUOTE)
+            .add(ESCAPE)
+            .add(HAS_HEADER)
+            .add(LINES_TO_SKIP)
+            .build();
 
-  // Immutable configuration
-  @VisibleForTesting
-  volatile CSVProperties props;
+    private static final Set<Relationship> RELATIONSHIPS
+            = ImmutableSet.<Relationship>builder()
+            .add(SUCCESS)
+            .add(FAILURE)
+            .build();
 
-  @Override
-  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-    return PROPERTIES;
-  }
+    // Immutable configuration
+    @VisibleForTesting
+    volatile CSVProperties props;
 
-  @Override
-  public Set<Relationship> getRelationships() {
-    return RELATIONSHIPS;
-  }
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
 
-  @OnScheduled
-  public void createCSVProperties(ProcessContext context) throws IOException {
-    super.setDefaultConfiguration(context);
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
 
-    this.props = new CSVProperties.Builder()
-        .charset(context.getProperty(CHARSET).getValue())
-        .delimiter(context.getProperty(DELIMITER).getValue())
-        .quote(context.getProperty(QUOTE).getValue())
-        .escape(context.getProperty(ESCAPE).getValue())
-        .hasHeader(context.getProperty(HAS_HEADER).asBoolean())
-        .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
-        .build();
-  }
+    @OnScheduled
+    public void createCSVProperties(ProcessContext context) throws IOException 
{
+        super.setDefaultConfiguration(context);
 
-  @Override
-  public void onTrigger(ProcessContext context, final ProcessSession session)
-      throws ProcessException {
-    FlowFile flowFile = session.get();
-    if (flowFile == null) {
-      return;
+        this.props = new CSVProperties.Builder()
+                .charset(context.getProperty(CHARSET).getValue())
+                .delimiter(context.getProperty(DELIMITER).getValue())
+                .quote(context.getProperty(QUOTE).getValue())
+                .escape(context.getProperty(ESCAPE).getValue())
+                .hasHeader(context.getProperty(HAS_HEADER).asBoolean())
+                .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
+                .build();
     }
 
-    String schemaProperty = context.getProperty(SCHEMA)
-        .evaluateAttributeExpressions(flowFile)
-        .getValue();
-    final Schema schema;
-    try {
-      schema = getSchema(schemaProperty, DefaultConfiguration.get());
-    } catch (SchemaNotFoundException e) {
-      getLogger().error("Cannot find schema: " + schemaProperty);
-      session.transfer(flowFile, FAILURE);
-      return;
-    }
+    @Override
+    public void onTrigger(ProcessContext context, final ProcessSession session)
+            throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        String schemaProperty = context.getProperty(SCHEMA)
+                .evaluateAttributeExpressions(flowFile)
+                .getValue();
+        final Schema schema;
+        try {
+            schema = getSchema(schemaProperty, DefaultConfiguration.get());
+        } catch (SchemaNotFoundException e) {
+            getLogger().error("Cannot find schema: " + schemaProperty);
+            session.transfer(flowFile, FAILURE);
+            return;
+        }
 
-    final DataFileWriter<Record> writer = new DataFileWriter<>(
-        AvroUtil.newDatumWriter(schema, Record.class));
-    writer.setCodec(CodecFactory.snappyCodec());
+        final DataFileWriter<Record> writer = new DataFileWriter<>(
+                AvroUtil.newDatumWriter(schema, Record.class));
+        writer.setCodec(CodecFactory.snappyCodec());
 
-    try {
-      flowFile = session.write(flowFile, new StreamCallback() {
-        @Override
-        public void process(InputStream in, OutputStream out) throws 
IOException {
-          long written = 0L;
-          long errors = 0L;
-          try (CSVFileReader<Record> reader = new CSVFileReader<>(
-              in, props, schema, Record.class)) {
-            reader.initialize();
-            try (DataFileWriter<Record> w = writer.create(schema, out)) {
-              while (reader.hasNext()) {
-                try {
-                  Record record = reader.next();
-                  w.append(record);
-                  written += 1;
-                } catch (DatasetRecordException e) {
-                  errors += 1;
+        try {
+            flowFile = session.write(flowFile, new StreamCallback() {
+                @Override
+                public void process(InputStream in, OutputStream out) throws 
IOException {
+                    long written = 0L;
+                    long errors = 0L;
+                    try (CSVFileReader<Record> reader = new CSVFileReader<>(
+                            in, props, schema, Record.class)) {
+                        reader.initialize();
+                        try (DataFileWriter<Record> w = writer.create(schema, 
out)) {
+                            while (reader.hasNext()) {
+                                try {
+                                    Record record = reader.next();
+                                    w.append(record);
+                                    written += 1;
+                                } catch (DatasetRecordException e) {
+                                    errors += 1;
+                                }
+                            }
+                        }
+                    }
+                    session.adjustCounter("Converted records", written,
+                            false /* update only if file transfer is 
successful */);
+                    session.adjustCounter("Conversion errors", errors,
+                            false /* update only if file transfer is 
successful */);
                 }
-              }
-            }
-          }
-          session.adjustCounter("Converted records", written,
-              false /* update only if file transfer is successful */);
-          session.adjustCounter("Conversion errors", errors,
-              false /* update only if file transfer is successful */);
-        }
-      });
+            });
 
-      session.transfer(flowFile, SUCCESS);
+            session.transfer(flowFile, SUCCESS);
 
-      //session.getProvenanceReporter().send(flowFile, 
target.getUri().toString());
-    } catch (ProcessException | DatasetIOException e) {
-      getLogger().error("Failed reading or writing", e);
-      session.transfer(flowFile, FAILURE);
-    } catch (DatasetException e) {
-      getLogger().error("Failed to read FlowFile", e);
-      session.transfer(flowFile, FAILURE);
+            //session.getProvenanceReporter().send(flowFile, 
target.getUri().toString());
+        } catch (ProcessException | DatasetIOException e) {
+            getLogger().error("Failed reading or writing", e);
+            session.transfer(flowFile, FAILURE);
+        } catch (DatasetException e) {
+            getLogger().error("Failed to read FlowFile", e);
+            session.transfer(flowFile, FAILURE);
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
index 7a35e31..78f80b9 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
@@ -105,7 +105,7 @@ public class ConvertJSONToAvro extends 
AbstractKiteProcessor {
         }
 
         String schemaProperty = context.getProperty(SCHEMA)
-            .evaluateAttributeExpressions(flowFile)
+                .evaluateAttributeExpressions(flowFile)
                 .getValue();
         final Schema schema;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
index 5586de1..7a30db1 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/StoreInKiteDataset.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.nifi.processors.kite;
 
 import com.google.common.collect.ImmutableList;
@@ -50,113 +49,114 @@ import org.kitesdk.data.spi.SchemaValidationUtil;
 @Tags({"kite", "avro", "parquet", "hadoop", "hive", "hdfs", "hbase"})
 @CapabilityDescription("Stores Avro records in a Kite dataset")
 public class StoreInKiteDataset extends AbstractKiteProcessor {
-  private static final Relationship SUCCESS = new Relationship.Builder()
-      .name("success")
-      .description("FlowFile content has been successfully saved")
-      .build();
-
-  private static final Relationship INCOMPATIBLE = new Relationship.Builder()
-      .name("incompatible")
-      .description("FlowFile content is not compatible with the target 
dataset")
-      .build();
-
-  private static final Relationship FAILURE = new Relationship.Builder()
-      .name("failure")
-      .description("FlowFile content could not be processed")
-      .build();
-
-  public static final PropertyDescriptor KITE_DATASET_URI =
-      new PropertyDescriptor.Builder()
-          .name("Target dataset URI")
-          .description("URI that identifies a Kite dataset where data will be 
stored")
-          .addValidator(RECOGNIZED_URI)
-          .expressionLanguageSupported(true)
-          .required(true)
-          .build();
-
-  private static final List<PropertyDescriptor> PROPERTIES =
-      ImmutableList.<PropertyDescriptor>builder()
-          .addAll(AbstractKiteProcessor.getProperties())
-          .add(KITE_DATASET_URI)
-          .build();
-
-  private static final Set<Relationship> RELATIONSHIPS =
-      ImmutableSet.<Relationship>builder()
-          .add(SUCCESS)
-          .add(INCOMPATIBLE)
-          .add(FAILURE)
-          .build();
-
-  @Override
-  protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-    return PROPERTIES;
-  }
-
-  @Override
-  public Set<Relationship> getRelationships() {
-    return RELATIONSHIPS;
-  }
-
-  @Override
-  public void onTrigger(ProcessContext context, final ProcessSession session)
-      throws ProcessException {
-    FlowFile flowFile = session.get();
-    if (flowFile == null) {
-      return;
-    }
 
-    final View<Record> target = load(context, flowFile);
-    final Schema schema = target.getDataset().getDescriptor().getSchema();
-
-    try {
-      StopWatch timer = new StopWatch(true);
-      session.read(flowFile, new InputStreamCallback() {
-        @Override
-        public void process(InputStream in) throws IOException {
-          try (DataFileStream<Record> stream = new DataFileStream<>(
-              in, AvroUtil.newDatumReader(schema, Record.class))) {
-            IncompatibleSchemaException.check(
-                SchemaValidationUtil.canRead(stream.getSchema(), schema),
-                "Incompatible file schema %s, expected %s",
-                stream.getSchema(), schema);
-
-            long written = 0L;
-            try (DatasetWriter<Record> writer = target.newWriter()) {
-              for (Record record : stream) {
-                writer.write(record);
-                written += 1;
-              }
-            } finally {
-              session.adjustCounter("Stored records", written,
-                  true /* cannot roll back the write */);
-            }
-          }
-        }
-      });
-      timer.stop();
+    private static final Relationship SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFile content has been successfully saved")
+            .build();
+
+    private static final Relationship INCOMPATIBLE = new Relationship.Builder()
+            .name("incompatible")
+            .description("FlowFile content is not compatible with the target 
dataset")
+            .build();
+
+    private static final Relationship FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFile content could not be processed")
+            .build();
+
+    public static final PropertyDescriptor KITE_DATASET_URI
+            = new PropertyDescriptor.Builder()
+            .name("Target dataset URI")
+            .description("URI that identifies a Kite dataset where data will 
be stored")
+            .addValidator(RECOGNIZED_URI)
+            .expressionLanguageSupported(true)
+            .required(true)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTIES
+            = ImmutableList.<PropertyDescriptor>builder()
+            .addAll(AbstractKiteProcessor.getProperties())
+            .add(KITE_DATASET_URI)
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS
+            = ImmutableSet.<Relationship>builder()
+            .add(SUCCESS)
+            .add(INCOMPATIBLE)
+            .add(FAILURE)
+            .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
 
-      session.getProvenanceReporter().send(flowFile,
-          target.getUri().toString(),
-          timer.getDuration(TimeUnit.MILLISECONDS),
-          true /* cannot roll back the write */ );
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
 
-      session.transfer(flowFile, SUCCESS);
+    @Override
+    public void onTrigger(ProcessContext context, final ProcessSession session)
+            throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
 
-    } catch (ProcessException | DatasetIOException e) {
-      getLogger().error("Failed to read FlowFile", e);
-      session.transfer(flowFile, FAILURE);
+        final View<Record> target = load(context, flowFile);
+        final Schema schema = target.getDataset().getDescriptor().getSchema();
+
+        try {
+            StopWatch timer = new StopWatch(true);
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(InputStream in) throws IOException {
+                    try (DataFileStream<Record> stream = new DataFileStream<>(
+                            in, AvroUtil.newDatumReader(schema, 
Record.class))) {
+                        IncompatibleSchemaException.check(
+                                
SchemaValidationUtil.canRead(stream.getSchema(), schema),
+                                "Incompatible file schema %s, expected %s",
+                                stream.getSchema(), schema);
+
+                        long written = 0L;
+                        try (DatasetWriter<Record> writer = 
target.newWriter()) {
+                            for (Record record : stream) {
+                                writer.write(record);
+                                written += 1;
+                            }
+                        } finally {
+                            session.adjustCounter("Stored records", written,
+                                    true /* cannot roll back the write */);
+                        }
+                    }
+                }
+            });
+            timer.stop();
+
+            session.getProvenanceReporter().send(flowFile,
+                    target.getUri().toString(),
+                    timer.getDuration(TimeUnit.MILLISECONDS),
+                    true /* cannot roll back the write */);
+
+            session.transfer(flowFile, SUCCESS);
+
+        } catch (ProcessException | DatasetIOException e) {
+            getLogger().error("Failed to read FlowFile", e);
+            session.transfer(flowFile, FAILURE);
+
+        } catch (ValidationException e) {
+            getLogger().error(e.getMessage());
+            getLogger().debug("Incompatible schema error", e);
+            session.transfer(flowFile, INCOMPATIBLE);
+        }
+    }
 
-    } catch (ValidationException e) {
-      getLogger().error(e.getMessage());
-      getLogger().debug("Incompatible schema error", e);
-      session.transfer(flowFile, INCOMPATIBLE);
+    private View<Record> load(ProcessContext context, FlowFile file) {
+        String uri = context.getProperty(KITE_DATASET_URI)
+                .evaluateAttributeExpressions(file)
+                .getValue();
+        return Datasets.load(uri, Record.class);
     }
-  }
-
-  private View<Record> load(ProcessContext context, FlowFile file) {
-    String uri = context.getProperty(KITE_DATASET_URI)
-        .evaluateAttributeExpressions(file)
-        .getValue();
-    return Datasets.load(uri, Record.class);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
index dbe3b81..753b72b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.nifi.processors.kite;
 
 import java.io.IOException;
@@ -33,94 +32,94 @@ import static 
org.apache.nifi.processors.kite.TestUtil.streamFor;
 
 public class TestCSVToAvroProcessor {
 
-  public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
-      .requiredLong("id")
-      .requiredString("color")
-      .optionalDouble("price")
-      .endRecord();
-
-  public static final String CSV_CONTENT = "" +
-      "1,green\n" +
-      ",blue,\n" + // invalid, ID is missing
-      "2,grey,12.95";
-
-  @Test
-  public void testBasicConversion() throws IOException {
-    TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
-    runner.assertNotValid();
-    runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
-    runner.assertValid();
-
-    runner.enqueue(streamFor(CSV_CONTENT));
-    runner.run();
-
-    long converted = runner.getCounterValue("Converted records");
-    long errors = runner.getCounterValue("Conversion errors");
-    Assert.assertEquals("Should convert 2 rows", 2, converted);
-    Assert.assertEquals("Should reject 1 row", 1, errors);
-
-    runner.assertAllFlowFilesTransferred("success", 1);
-  }
-
-  @Test
-  public void testAlternateCharset() throws IOException {
-    TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
-    runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
-    runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
-    runner.assertValid();
-
-    runner.enqueue(streamFor(CSV_CONTENT, Charset.forName("UTF-16")));
-    runner.run();
-
-    long converted = runner.getCounterValue("Converted records");
-    long errors = runner.getCounterValue("Conversion errors");
-    Assert.assertEquals("Should convert 2 rows", 2, converted);
-    Assert.assertEquals("Should reject 1 row", 1, errors);
-
-    runner.assertAllFlowFilesTransferred("success", 1);
-  }
-
-  @Test
-  public void testCSVProperties() throws IOException {
-    TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
-    ConvertCSVToAvro processor = new ConvertCSVToAvro();
-    ProcessContext context = runner.getProcessContext();
-
-    // check defaults
-    processor.createCSVProperties(context);
-    Assert.assertEquals("Charset should match",
-        "utf8", processor.props.charset);
-    Assert.assertEquals("Delimiter should match",
-        ",", processor.props.delimiter);
-    Assert.assertEquals("Quote should match",
-        "\"", processor.props.quote);
-    Assert.assertEquals("Escape should match",
-        "\\", processor.props.escape);
-    Assert.assertEquals("Header flag should match",
-        false, processor.props.useHeader);
-    Assert.assertEquals("Lines to skip should match",
-        0, processor.props.linesToSkip);
-
-    runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
-    runner.setProperty(ConvertCSVToAvro.DELIMITER, "|");
-    runner.setProperty(ConvertCSVToAvro.QUOTE, "'");
-    runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603");
-    runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true");
-    runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2");
-
-    // check updates
-    processor.createCSVProperties(context);
-    Assert.assertEquals("Charset should match",
-        "utf16", processor.props.charset);
-    Assert.assertEquals("Delimiter should match",
-        "|", processor.props.delimiter);
-    Assert.assertEquals("Quote should match",
-        "'", processor.props.quote);
-    Assert.assertEquals("Escape should match",
-        "\u2603", processor.props.escape);
-    Assert.assertEquals("Header flag should match",
-        true, processor.props.useHeader);
-    Assert.assertEquals("Lines to skip should match",
-        2, processor.props.linesToSkip);
-  }
+    public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
+            .requiredLong("id")
+            .requiredString("color")
+            .optionalDouble("price")
+            .endRecord();
+
+    public static final String CSV_CONTENT = ""
+            + "1,green\n"
+            + ",blue,\n" + // invalid, ID is missing
+            "2,grey,12.95";
+
+    @Test
+    public void testBasicConversion() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
+        runner.assertValid();
+
+        runner.enqueue(streamFor(CSV_CONTENT));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 1 row", 1, errors);
+
+        runner.assertAllFlowFilesTransferred("success", 1);
+    }
+
+    @Test
+    public void testAlternateCharset() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
+        runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
+        runner.assertValid();
+
+        runner.enqueue(streamFor(CSV_CONTENT, Charset.forName("UTF-16")));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 1 row", 1, errors);
+
+        runner.assertAllFlowFilesTransferred("success", 1);
+    }
+
+    @Test
+    public void testCSVProperties() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+        ConvertCSVToAvro processor = new ConvertCSVToAvro();
+        ProcessContext context = runner.getProcessContext();
+
+        // check defaults
+        processor.createCSVProperties(context);
+        Assert.assertEquals("Charset should match",
+                "utf8", processor.props.charset);
+        Assert.assertEquals("Delimiter should match",
+                ",", processor.props.delimiter);
+        Assert.assertEquals("Quote should match",
+                "\"", processor.props.quote);
+        Assert.assertEquals("Escape should match",
+                "\\", processor.props.escape);
+        Assert.assertEquals("Header flag should match",
+                false, processor.props.useHeader);
+        Assert.assertEquals("Lines to skip should match",
+                0, processor.props.linesToSkip);
+
+        runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
+        runner.setProperty(ConvertCSVToAvro.DELIMITER, "|");
+        runner.setProperty(ConvertCSVToAvro.QUOTE, "'");
+        runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603");
+        runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true");
+        runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2");
+
+        // check updates
+        processor.createCSVProperties(context);
+        Assert.assertEquals("Charset should match",
+                "utf16", processor.props.charset);
+        Assert.assertEquals("Delimiter should match",
+                "|", processor.props.delimiter);
+        Assert.assertEquals("Quote should match",
+                "'", processor.props.quote);
+        Assert.assertEquals("Escape should match",
+                "\u2603", processor.props.escape);
+        Assert.assertEquals("Header flag should match",
+                true, processor.props.useHeader);
+        Assert.assertEquals("Lines to skip should match",
+                2, processor.props.linesToSkip);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
index 7b1019d..724a4c6 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConfigurationProperty.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.nifi.processors.kite;
 
 import java.io.File;
@@ -35,43 +34,43 @@ import org.kitesdk.data.spi.DefaultConfiguration;
 
 public class TestConfigurationProperty {
 
-  @Rule
-  public final TemporaryFolder temp = new TemporaryFolder();
-  public File confLocation;
+    @Rule
+    public final TemporaryFolder temp = new TemporaryFolder();
+    public File confLocation;
 
-  @Before
-  public void saveConfiguration() throws IOException {
-    Configuration conf = new Configuration(false);
-    conf.setBoolean("nifi.config.canary", true);
+    @Before
+    public void saveConfiguration() throws IOException {
+        Configuration conf = new Configuration(false);
+        conf.setBoolean("nifi.config.canary", true);
 
-    confLocation = temp.newFile("nifi-conf.xml");
-    FileOutputStream out = new FileOutputStream(confLocation);
-    conf.writeXml(out);
-    out.close();
-  }
+        confLocation = temp.newFile("nifi-conf.xml");
+        FileOutputStream out = new FileOutputStream(confLocation);
+        conf.writeXml(out);
+        out.close();
+    }
 
-  @Test
-  public void testConfigurationCanary() throws IOException {
-    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
-    runner.setProperty(
-        AbstractKiteProcessor.CONF_XML_FILES, confLocation.toString());
+    @Test
+    public void testConfigurationCanary() throws IOException {
+        TestRunner runner = 
TestRunners.newTestRunner(StoreInKiteDataset.class);
+        runner.setProperty(
+                AbstractKiteProcessor.CONF_XML_FILES, confLocation.toString());
 
-    Assert.assertFalse("Should not contain canary value",
-        DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
+        Assert.assertFalse("Should not contain canary value",
+                DefaultConfiguration.get().getBoolean("nifi.config.canary", 
false));
 
-    AbstractKiteProcessor processor = new StoreInKiteDataset();
-    ProcessContext context = runner.getProcessContext();
-    processor.setDefaultConfiguration(context);
+        AbstractKiteProcessor processor = new StoreInKiteDataset();
+        ProcessContext context = runner.getProcessContext();
+        processor.setDefaultConfiguration(context);
 
-    Assert.assertTrue("Should contain canary value",
-        DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
-  }
+        Assert.assertTrue("Should contain canary value",
+                DefaultConfiguration.get().getBoolean("nifi.config.canary", 
false));
+    }
 
-  @Test
-  public void testFilesMustExist() throws IOException {
-    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
-    runner.setProperty(
-        AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString());
-    runner.assertNotValid();
-  }
+    @Test
+    public void testFilesMustExist() throws IOException {
+        TestRunner runner = 
TestRunners.newTestRunner(StoreInKiteDataset.class);
+        runner.setProperty(
+                AbstractKiteProcessor.CONF_XML_FILES, 
temp.newFile().toString());
+        runner.assertNotValid();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java
index a3489ec..9354e8f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestGetSchema.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.nifi.processors.kite;
 
 import java.io.File;
@@ -39,63 +38,63 @@ import static 
org.apache.nifi.processors.kite.TestUtil.bytesFor;
 
 public class TestGetSchema {
 
-  public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
-      .requiredLong("id")
-      .requiredString("color")
-      .optionalDouble("price")
-      .endRecord();
-
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
-
-  @Test
-  @Ignore("Does not work on windows")
-  public void testSchemaFromFileSystem() throws IOException {
-    File schemaFile = temp.newFile("schema.avsc");
-    FileOutputStream out = new FileOutputStream(schemaFile);
-    out.write(bytesFor(SCHEMA.toString(), Charset.forName("utf8")));
-    out.close();
-
-    Schema schema = AbstractKiteProcessor.getSchema(
-        schemaFile.toString(), DefaultConfiguration.get());
-
-    Assert.assertEquals("Schema from file should match", SCHEMA, schema);
-  }
-
-  @Test
-  @Ignore("Does not work on windows")
-  public void testSchemaFromKiteURIs() throws IOException {
-    String location = temp.newFolder("ns", "temp").toString();
-    if (location.endsWith("/")) {
-      location = location.substring(0, location.length() - 1);
+    public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
+            .requiredLong("id")
+            .requiredString("color")
+            .optionalDouble("price")
+            .endRecord();
+
+    @Rule
+    public TemporaryFolder temp = new TemporaryFolder();
+
+    @Test
+    @Ignore("Does not work on windows")
+    public void testSchemaFromFileSystem() throws IOException {
+        File schemaFile = temp.newFile("schema.avsc");
+        FileOutputStream out = new FileOutputStream(schemaFile);
+        out.write(bytesFor(SCHEMA.toString(), Charset.forName("utf8")));
+        out.close();
+
+        Schema schema = AbstractKiteProcessor.getSchema(
+                schemaFile.toString(), DefaultConfiguration.get());
+
+        Assert.assertEquals("Schema from file should match", SCHEMA, schema);
+    }
+
+    @Test
+    @Ignore("Does not work on windows")
+    public void testSchemaFromKiteURIs() throws IOException {
+        String location = temp.newFolder("ns", "temp").toString();
+        if (location.endsWith("/")) {
+            location = location.substring(0, location.length() - 1);
+        }
+        String datasetUri = "dataset:" + location;
+        DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+                .schema(SCHEMA)
+                .build();
+
+        Datasets.create(datasetUri, descriptor);
+
+        Schema schema = AbstractKiteProcessor.getSchema(
+                datasetUri, DefaultConfiguration.get());
+        Assert.assertEquals("Schema from dataset URI should match", SCHEMA, 
schema);
+
+        schema = AbstractKiteProcessor.getSchema(
+                "view:file:" + location + "?color=orange", 
DefaultConfiguration.get());
+        Assert.assertEquals("Schema from view URI should match", SCHEMA, 
schema);
+    }
+
+    @Test
+    public void testSchemaFromResourceURI() throws IOException {
+        DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+                .schemaUri("resource:schema/user.avsc") // in kite-data-core 
test-jar
+                .build();
+        Schema expected = descriptor.getSchema();
+
+        Schema schema = AbstractKiteProcessor.getSchema(
+                "resource:schema/user.avsc", DefaultConfiguration.get());
+
+        Assert.assertEquals("Schema from resource URI should match",
+                expected, schema);
     }
-    String datasetUri = "dataset:" + location;
-    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
-        .schema(SCHEMA)
-        .build();
-
-    Datasets.create(datasetUri, descriptor);
-
-    Schema schema = AbstractKiteProcessor.getSchema(
-        datasetUri, DefaultConfiguration.get());
-    Assert.assertEquals("Schema from dataset URI should match", SCHEMA, 
schema);
-
-    schema = AbstractKiteProcessor.getSchema(
-        "view:file:" + location + "?color=orange", DefaultConfiguration.get());
-    Assert.assertEquals("Schema from view URI should match", SCHEMA, schema);
-  }
-
-  @Test
-  public void testSchemaFromResourceURI() throws IOException {
-    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
-        .schemaUri("resource:schema/user.avsc") // in kite-data-core test-jar
-        .build();
-    Schema expected = descriptor.getSchema();
-
-    Schema schema = AbstractKiteProcessor.getSchema(
-        "resource:schema/user.avsc", DefaultConfiguration.get());
-
-    Assert.assertEquals("Schema from resource URI should match",
-        expected, schema);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
index 434b969..d50e7f9 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.nifi.processors.kite;
 
 import java.io.IOException;
@@ -30,32 +29,33 @@ import org.junit.Test;
 import static org.apache.nifi.processors.kite.TestUtil.streamFor;
 
 public class TestJSONToAvroProcessor {
-  public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
-      .requiredLong("id")
-      .requiredString("color")
-      .optionalDouble("price")
-      .endRecord();
-
-  public static final String JSON_CONTENT = "" +
-      "{\"id\": 1,\"color\": \"green\"}" +
-      "{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is a string
-      "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
-
-  @Test
-  public void testBasicConversion() throws IOException {
-    TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
-    runner.assertNotValid();
-    runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
-    runner.assertValid();
-
-    runner.enqueue(streamFor(JSON_CONTENT));
-    runner.run();
-
-    long converted = runner.getCounterValue("Converted records");
-    long errors = runner.getCounterValue("Conversion errors");
-    Assert.assertEquals("Should convert 2 rows", 2, converted);
-    Assert.assertEquals("Should reject 1 row", 1, errors);
-
-    runner.assertAllFlowFilesTransferred("success", 1);
-  }
+
+    public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
+            .requiredLong("id")
+            .requiredString("color")
+            .optionalDouble("price")
+            .endRecord();
+
+    public static final String JSON_CONTENT = ""
+            + "{\"id\": 1,\"color\": \"green\"}"
+            + "{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is 
a string
+            "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
+
+    @Test
+    public void testBasicConversion() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
+        runner.assertValid();
+
+        runner.enqueue(streamFor(JSON_CONTENT));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 1 row", 1, errors);
+
+        runner.assertAllFlowFilesTransferred("success", 1);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java
index 00a98db..087e1cb 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteProcessorsCluster.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.nifi.processors.kite;
 
 import com.google.common.collect.Lists;
@@ -54,79 +53,79 @@ import static org.apache.nifi.processors.kite.TestUtil.user;
 @Ignore("Does not work on windows")
 public class TestKiteProcessorsCluster {
 
-  public static MiniCluster cluster = null;
-  public static DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
-      .schema(USER_SCHEMA)
-      .build();
-
-  @BeforeClass
-  public static void startCluster() throws IOException, InterruptedException {
-    long rand = Math.abs((long) (Math.random() * 1000000));
-    cluster = new MiniCluster.Builder()
-        .workDir("/tmp/minicluster-" + rand)
-        .clean(true)
-        .addService(HdfsService.class)
-        .addService(HiveService.class)
-        .bindIP("127.0.0.1")
-        .hiveMetastorePort(9083)
-        .build();
-    cluster.start();
-  }
-
-  @AfterClass
-  public static void stopCluster() throws IOException, InterruptedException {
-    if (cluster != null) {
-      cluster.stop();
-      cluster = null;
+    public static MiniCluster cluster = null;
+    public static DatasetDescriptor descriptor = new 
DatasetDescriptor.Builder()
+            .schema(USER_SCHEMA)
+            .build();
+
+    @BeforeClass
+    public static void startCluster() throws IOException, InterruptedException 
{
+        long rand = Math.abs((long) (Math.random() * 1000000));
+        cluster = new MiniCluster.Builder()
+                .workDir("/tmp/minicluster-" + rand)
+                .clean(true)
+                .addService(HdfsService.class)
+                .addService(HiveService.class)
+                .bindIP("127.0.0.1")
+                .hiveMetastorePort(9083)
+                .build();
+        cluster.start();
     }
-  }
 
-  @Test
-  public void testBasicStoreToHive() throws IOException {
-    String datasetUri = "dataset:hive:ns/test";
+    @AfterClass
+    public static void stopCluster() throws IOException, InterruptedException {
+        if (cluster != null) {
+            cluster.stop();
+            cluster = null;
+        }
+    }
 
-    Dataset<Record> dataset = Datasets.create(datasetUri, descriptor, 
Record.class);
+    @Test
+    public void testBasicStoreToHive() throws IOException {
+        String datasetUri = "dataset:hive:ns/test";
 
-    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
-    runner.assertNotValid();
+        Dataset<Record> dataset = Datasets.create(datasetUri, descriptor, 
Record.class);
 
-    runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
-    runner.assertValid();
+        TestRunner runner = 
TestRunners.newTestRunner(StoreInKiteDataset.class);
+        runner.assertNotValid();
 
-    List<Record> users = Lists.newArrayList(
-        user("a", "[email protected]"),
-        user("b", "[email protected]"),
-        user("c", "[email protected]")
-    );
+        runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
+        runner.assertValid();
 
-    runner.enqueue(streamFor(users));
-    runner.run();
+        List<Record> users = Lists.newArrayList(
+                user("a", "[email protected]"),
+                user("b", "[email protected]"),
+                user("c", "[email protected]")
+        );
 
-    runner.assertAllFlowFilesTransferred("success", 1);
-    List<Record> stored = Lists.newArrayList(
-        (Iterable<Record>) dataset.newReader());
-    Assert.assertEquals("Records should match", users, stored);
+        runner.enqueue(streamFor(users));
+        runner.run();
 
-    Datasets.delete(datasetUri);
-  }
+        runner.assertAllFlowFilesTransferred("success", 1);
+        List<Record> stored = Lists.newArrayList(
+                (Iterable<Record>) dataset.newReader());
+        Assert.assertEquals("Records should match", users, stored);
 
-  @Test
-  public void testSchemaFromDistributedFileSystem() throws IOException {
-    Schema expected = SchemaBuilder.record("Test").fields()
-        .requiredLong("id")
-        .requiredString("color")
-        .optionalDouble("price")
-        .endRecord();
+        Datasets.delete(datasetUri);
+    }
 
-    Path schemaPath = new Path("hdfs:/tmp/schema.avsc");
-    FileSystem fs = schemaPath.getFileSystem(DefaultConfiguration.get());
-    OutputStream out = fs.create(schemaPath);
-    out.write(bytesFor(expected.toString(), Charset.forName("utf8")));
-    out.close();
+    @Test
+    public void testSchemaFromDistributedFileSystem() throws IOException {
+        Schema expected = SchemaBuilder.record("Test").fields()
+                .requiredLong("id")
+                .requiredString("color")
+                .optionalDouble("price")
+                .endRecord();
 
-    Schema schema = AbstractKiteProcessor.getSchema(
-        schemaPath.toString(), DefaultConfiguration.get());
+        Path schemaPath = new Path("hdfs:/tmp/schema.avsc");
+        FileSystem fs = schemaPath.getFileSystem(DefaultConfiguration.get());
+        OutputStream out = fs.create(schemaPath);
+        out.write(bytesFor(expected.toString(), Charset.forName("utf8")));
+        out.close();
 
-    Assert.assertEquals("Schema from file should match", expected, schema);
-  }
+        Schema schema = AbstractKiteProcessor.getSchema(
+                schemaPath.toString(), DefaultConfiguration.get());
+
+        Assert.assertEquals("Schema from file should match", expected, schema);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java
index 5063f5d..3fcae4f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestKiteStorageProcessor.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.nifi.processors.kite;
 
 import com.google.common.collect.Lists;
@@ -47,125 +46,125 @@ import static 
org.apache.nifi.processors.kite.TestUtil.user;
 @Ignore("Does not work on windows")
 public class TestKiteStorageProcessor {
 
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
-
-  private String datasetUri = null;
-  private Dataset<Record> dataset = null;
-
-  @Before
-  public void createDataset() throws Exception {
-    DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
-        .schema(TestUtil.USER_SCHEMA)
-        .build();
-    this.datasetUri = "dataset:file:" + temp.newFolder("ns", 
"temp").toString();
-    this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
-  }
-
-  @After
-  public void deleteDataset() throws Exception {
-    Datasets.delete(datasetUri);
-  }
-
-  @Test
-  public void testBasicStore() throws IOException {
-    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
-    runner.assertNotValid();
-
-    runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
-    runner.assertValid();
-
-    List<Record> users = Lists.newArrayList(
-        user("a", "[email protected]"),
-        user("b", "[email protected]"),
-        user("c", "[email protected]")
-    );
-
-    runner.enqueue(streamFor(users));
-    runner.run();
-
-    runner.assertAllFlowFilesTransferred("success", 1);
-    runner.assertQueueEmpty();
-    Assert.assertEquals("Should store 3 values",
-        3, (long) runner.getCounterValue("Stored records"));
-
-    List<Record> stored = Lists.newArrayList(
-        (Iterable<Record>) dataset.newReader());
-    Assert.assertEquals("Records should match", users, stored);
-  }
-
-  @Test
-  public void testViewURI() {
-    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
-    runner.setProperty(
-        StoreInKiteDataset.KITE_DATASET_URI, "view:hive:ns/table?year=2015");
-    runner.assertValid();
-  }
-
-  @Test
-  public void testInvalidURI() {
-    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
-    runner.setProperty(
-        StoreInKiteDataset.KITE_DATASET_URI, "dataset:unknown");
-    runner.assertNotValid();
-  }
-
-  @Test
-  public void testUnreadableContent() throws IOException {
-    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
-    runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
-    runner.assertValid();
-
-    runner.enqueue(invalidStreamFor(user("a", "[email protected]")));
-    runner.run();
-
-    runner.assertAllFlowFilesTransferred("failure", 1);
-  }
-
-  @Test
-  public void testCorruptedBlocks() throws IOException {
-    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
-    runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
-    runner.assertValid();
-
-    List<Record> records = Lists.newArrayList();
-    for (int i = 0; i < 10000; i += 1) {
-      String num = String.valueOf(i);
-      records.add(user(num, num + "@example.com"));
+    @Rule
+    public TemporaryFolder temp = new TemporaryFolder();
+
+    private String datasetUri = null;
+    private Dataset<Record> dataset = null;
+
+    @Before
+    public void createDataset() throws Exception {
+        DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
+                .schema(TestUtil.USER_SCHEMA)
+                .build();
+        this.datasetUri = "dataset:file:" + temp.newFolder("ns", 
"temp").toString();
+        this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
+    }
+
+    @After
+    public void deleteDataset() throws Exception {
+        Datasets.delete(datasetUri);
+    }
+
+    @Test
+    public void testBasicStore() throws IOException {
+        TestRunner runner = 
TestRunners.newTestRunner(StoreInKiteDataset.class);
+        runner.assertNotValid();
+
+        runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
+        runner.assertValid();
+
+        List<Record> users = Lists.newArrayList(
+                user("a", "[email protected]"),
+                user("b", "[email protected]"),
+                user("c", "[email protected]")
+        );
+
+        runner.enqueue(streamFor(users));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred("success", 1);
+        runner.assertQueueEmpty();
+        Assert.assertEquals("Should store 3 values",
+                3, (long) runner.getCounterValue("Stored records"));
+
+        List<Record> stored = Lists.newArrayList(
+                (Iterable<Record>) dataset.newReader());
+        Assert.assertEquals("Records should match", users, stored);
+    }
+
+    @Test
+    public void testViewURI() {
+        TestRunner runner = 
TestRunners.newTestRunner(StoreInKiteDataset.class);
+        runner.setProperty(
+                StoreInKiteDataset.KITE_DATASET_URI, 
"view:hive:ns/table?year=2015");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testInvalidURI() {
+        TestRunner runner = 
TestRunners.newTestRunner(StoreInKiteDataset.class);
+        runner.setProperty(
+                StoreInKiteDataset.KITE_DATASET_URI, "dataset:unknown");
+        runner.assertNotValid();
     }
 
-    runner.enqueue(invalidStreamFor(records));
-    runner.run();
+    @Test
+    public void testUnreadableContent() throws IOException {
+        TestRunner runner = 
TestRunners.newTestRunner(StoreInKiteDataset.class);
+        runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
+        runner.assertValid();
 
-    long stored = runner.getCounterValue("Stored records");
-    Assert.assertTrue("Should store some readable values",
-        0 < stored && stored < 10000);
+        runner.enqueue(invalidStreamFor(user("a", "[email protected]")));
+        runner.run();
 
-    runner.assertAllFlowFilesTransferred("success", 1);
-  }
+        runner.assertAllFlowFilesTransferred("failure", 1);
+    }
 
-  @Test
-  public void testIncompatibleSchema() throws IOException {
-    Schema incompatible = SchemaBuilder.record("User").fields()
-        .requiredLong("id")
-        .requiredString("username")
-        .optionalString("email") // the dataset requires this field
-        .endRecord();
+    @Test
+    public void testCorruptedBlocks() throws IOException {
+        TestRunner runner = 
TestRunners.newTestRunner(StoreInKiteDataset.class);
+        runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
+        runner.assertValid();
 
-    // this user has the email field and could be stored, but the schema is
-    // still incompatible so the entire stream is rejected
-    Record incompatibleUser = new Record(incompatible);
-    incompatibleUser.put("id", 1L);
-    incompatibleUser.put("username", "a");
-    incompatibleUser.put("email", "[email protected]");
+        List<Record> records = Lists.newArrayList();
+        for (int i = 0; i < 10000; i += 1) {
+            String num = String.valueOf(i);
+            records.add(user(num, num + "@example.com"));
+        }
 
-    TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
-    runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
-    runner.assertValid();
+        runner.enqueue(invalidStreamFor(records));
+        runner.run();
 
-    runner.enqueue(streamFor(incompatibleUser));
-    runner.run();
+        long stored = runner.getCounterValue("Stored records");
+        Assert.assertTrue("Should store some readable values",
+                0 < stored && stored < 10000);
 
-    runner.assertAllFlowFilesTransferred("incompatible", 1);
-  }
+        runner.assertAllFlowFilesTransferred("success", 1);
+    }
+
+    @Test
+    public void testIncompatibleSchema() throws IOException {
+        Schema incompatible = SchemaBuilder.record("User").fields()
+                .requiredLong("id")
+                .requiredString("username")
+                .optionalString("email") // the dataset requires this field
+                .endRecord();
+
+        // this user has the email field and could be stored, but the schema is
+        // still incompatible so the entire stream is rejected
+        Record incompatibleUser = new Record(incompatible);
+        incompatibleUser.put("id", 1L);
+        incompatibleUser.put("username", "a");
+        incompatibleUser.put("email", "[email protected]");
+
+        TestRunner runner = 
TestRunners.newTestRunner(StoreInKiteDataset.class);
+        runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
+        runner.assertValid();
+
+        runner.enqueue(streamFor(incompatibleUser));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred("incompatible", 1);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b2a1f521/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java
index 2eb30af..37ddbec 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestUtil.java
@@ -16,7 +16,6 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.nifi.processors.kite;
 
 import java.io.ByteArrayInputStream;
@@ -37,67 +36,68 @@ import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
 
 public class TestUtil {
-  public static final Schema USER_SCHEMA = 
SchemaBuilder.record("User").fields()
-      .requiredString("username")
-      .requiredString("email")
-      .endRecord();
-
-  public static Record user(String username, String email) {
-    Record user = new Record(USER_SCHEMA);
-    user.put("username", username);
-    user.put("email", email);
-    return user;
-  }
-
-  public static InputStream streamFor(Record... records) throws IOException {
-    return streamFor(Arrays.asList(records));
-  }
-
-  public static InputStream streamFor(List<Record> records) throws IOException 
{
-    return new ByteArrayInputStream(bytesFor(records));
-  }
-
-  public static InputStream invalidStreamFor(Record... records) throws 
IOException {
-    return invalidStreamFor(Arrays.asList(records));
-  }
-
-  public static InputStream invalidStreamFor(List<Record> records) throws 
IOException {
-    // purposely truncate the content
-    byte[] bytes = bytesFor(records);
-    return new ByteArrayInputStream(bytes, 0, bytes.length / 2);
-  }
-
-  private static byte[] bytesFor(List<Record> records) throws IOException {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    DataFileWriter<Record> writer = new DataFileWriter<>(
-        AvroUtil.newDatumWriter(records.get(0).getSchema(), Record.class));
-    writer.setCodec(CodecFactory.snappyCodec());
-    writer = writer.create(records.get(0).getSchema(), out);
-
-    for (Record record : records) {
-      writer.append(record);
+
+    public static final Schema USER_SCHEMA = 
SchemaBuilder.record("User").fields()
+            .requiredString("username")
+            .requiredString("email")
+            .endRecord();
+
+    public static Record user(String username, String email) {
+        Record user = new Record(USER_SCHEMA);
+        user.put("username", username);
+        user.put("email", email);
+        return user;
     }
 
-    writer.flush();
+    public static InputStream streamFor(Record... records) throws IOException {
+        return streamFor(Arrays.asList(records));
+    }
 
-    return out.toByteArray();
-  }
+    public static InputStream streamFor(List<Record> records) throws 
IOException {
+        return new ByteArrayInputStream(bytesFor(records));
+    }
+
+    public static InputStream invalidStreamFor(Record... records) throws 
IOException {
+        return invalidStreamFor(Arrays.asList(records));
+    }
+
+    public static InputStream invalidStreamFor(List<Record> records) throws 
IOException {
+        // purposely truncate the content
+        byte[] bytes = bytesFor(records);
+        return new ByteArrayInputStream(bytes, 0, bytes.length / 2);
+    }
 
-  public static InputStream streamFor(String content) throws 
CharacterCodingException {
-    return streamFor(content, Charset.forName("utf8"));
-  }
+    private static byte[] bytesFor(List<Record> records) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataFileWriter<Record> writer = new DataFileWriter<>(
+                AvroUtil.newDatumWriter(records.get(0).getSchema(), 
Record.class));
+        writer.setCodec(CodecFactory.snappyCodec());
+        writer = writer.create(records.get(0).getSchema(), out);
 
-  public static InputStream streamFor(String content, Charset charset) throws 
CharacterCodingException {
-    return new ByteArrayInputStream(bytesFor(content, charset));
-  }
+        for (Record record : records) {
+            writer.append(record);
+        }
 
-  public static byte[] bytesFor(String content, Charset charset) throws 
CharacterCodingException {
-    CharBuffer chars = CharBuffer.wrap(content);
-    CharsetEncoder encoder = charset.newEncoder();
-    ByteBuffer buffer = encoder.encode(chars);
-    byte[] bytes = new byte[buffer.remaining()];
-    buffer.get(bytes);
-    return bytes;
-  }
+        writer.flush();
+
+        return out.toByteArray();
+    }
+
+    public static InputStream streamFor(String content) throws 
CharacterCodingException {
+        return streamFor(content, Charset.forName("utf8"));
+    }
+
+    public static InputStream streamFor(String content, Charset charset) 
throws CharacterCodingException {
+        return new ByteArrayInputStream(bytesFor(content, charset));
+    }
+
+    public static byte[] bytesFor(String content, Charset charset) throws 
CharacterCodingException {
+        CharBuffer chars = CharBuffer.wrap(content);
+        CharsetEncoder encoder = charset.newEncoder();
+        ByteBuffer buffer = encoder.encode(chars);
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+        return bytes;
+    }
 
 }

Reply via email to