http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
new file mode 100644
index 0000000..a81e843
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.util;
+
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import java.math.BigInteger;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.TimeZone;
+import java.util.function.Consumer;
+
+public class DataTypeUtils {
+
+    private static final TimeZone gmt = TimeZone.getTimeZone("gmt");
+
+    public static Object convertType(final Object value, final DataType 
dataType, final String fieldName) {
+        return convertType(value, dataType, 
RecordFieldType.DATE.getDefaultFormat(), 
RecordFieldType.TIME.getDefaultFormat(), 
RecordFieldType.TIMESTAMP.getDefaultFormat(), fieldName);
+    }
+
+    public static Object convertType(final Object value, final DataType 
dataType, final String dateFormat, final String timeFormat, final String 
timestampFormat, final String fieldName) {
+        switch (dataType.getFieldType()) {
+            case BIGINT:
+                return toBigInt(value, fieldName);
+            case BOOLEAN:
+                return toBoolean(value, fieldName);
+            case BYTE:
+                return toByte(value, fieldName);
+            case CHAR:
+                return toCharacter(value, fieldName);
+            case DATE:
+                return toDate(value, dateFormat, fieldName);
+            case DOUBLE:
+                return toDouble(value, fieldName);
+            case FLOAT:
+                return toFloat(value, fieldName);
+            case INT:
+                return toInteger(value, fieldName);
+            case LONG:
+                return toLong(value, fieldName);
+            case SHORT:
+                return toShort(value, fieldName);
+            case STRING:
+                return toString(value, dateFormat, timeFormat, 
timestampFormat);
+            case TIME:
+                return toTime(value, timeFormat, fieldName);
+            case TIMESTAMP:
+                return toTimestamp(value, timestampFormat, fieldName);
+            case ARRAY:
+                return toArray(value, fieldName);
+            case MAP:
+                return toMap(value, fieldName);
+            case RECORD:
+                final RecordDataType recordType = (RecordDataType) dataType;
+                final RecordSchema childSchema = recordType.getChildSchema();
+                return toRecord(value, childSchema, fieldName);
+            case CHOICE: {
+                if (value == null) {
+                    return null;
+                }
+
+                final ChoiceDataType choiceDataType = (ChoiceDataType) 
dataType;
+                final DataType chosenDataType = chooseDataType(value, 
choiceDataType);
+                if (chosenDataType == null) {
+                    throw new IllegalTypeConversionException("Cannot convert 
value [" + value + "] of type " + value.getClass()
+                        + " for field " + fieldName + " to any of the 
following available Sub-Types for a Choice: " + 
choiceDataType.getPossibleSubTypes());
+                }
+
+                return convertType(value, chosenDataType, fieldName);
+            }
+        }
+
+        return null;
+    }
+
+
+    public static boolean isCompatibleDataType(final Object value, final 
DataType dataType) {
+        switch (dataType.getFieldType()) {
+            case ARRAY:
+                return isArrayTypeCompatible(value);
+            case BIGINT:
+                return isBigIntTypeCompatible(value);
+            case BOOLEAN:
+                return isBooleanTypeCompatible(value);
+            case BYTE:
+                return isByteTypeCompatible(value);
+            case CHAR:
+                return isCharacterTypeCompatible(value);
+            case DATE:
+                return isDateTypeCompatible(value, dataType.getFormat());
+            case DOUBLE:
+                return isDoubleTypeCompatible(value);
+            case FLOAT:
+                return isFloatTypeCompatible(value);
+            case INT:
+                return isIntegerTypeCompatible(value);
+            case LONG:
+                return isLongTypeCompatible(value);
+            case RECORD:
+                return isRecordTypeCompatible(value);
+            case SHORT:
+                return isShortTypeCompatible(value);
+            case TIME:
+                return isTimeTypeCompatible(value, dataType.getFormat());
+            case TIMESTAMP:
+                return isTimestampTypeCompatible(value, dataType.getFormat());
+            case STRING:
+                return isStringTypeCompatible(value);
+            case MAP:
+                return isMapTypeCompatible(value);
+            case CHOICE: {
+                final DataType chosenDataType = chooseDataType(value, 
(ChoiceDataType) dataType);
+                return chosenDataType != null;
+            }
+        }
+
+        return false;
+    }
+
+    public static DataType chooseDataType(final Object value, final 
ChoiceDataType choiceType) {
+        for (final DataType subType : choiceType.getPossibleSubTypes()) {
+            if (isCompatibleDataType(value, subType)) {
+                return subType;
+            }
+        }
+
+        return null;
+    }
+
+    public static Record toRecord(final Object value, final RecordSchema 
recordSchema, final String fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Record) {
+            return ((Record) value);
+        }
+
+        if (value instanceof Map) {
+            if (recordSchema == null) {
+                throw new IllegalTypeConversionException("Cannot convert value 
[" + value + "] of type " + value.getClass()
+                    + " to Record for field " + fieldName + " because the 
value is a Map but no Record Schema was provided");
+            }
+
+            final Map<?, ?> map = (Map<?, ?>) value;
+            final Map<String, Object> coercedValues = new HashMap<>();
+
+            for (final Map.Entry<?, ?> entry : map.entrySet()) {
+                final Object keyValue = entry.getKey();
+                if (keyValue == null) {
+                    continue;
+                }
+
+                final String key = keyValue.toString();
+                final Optional<DataType> desiredTypeOption = 
recordSchema.getDataType(key);
+                if (!desiredTypeOption.isPresent()) {
+                    continue;
+                }
+
+                final Object rawValue = entry.getValue();
+                final Object coercedValue = convertType(rawValue, 
desiredTypeOption.get(), fieldName);
+                coercedValues.put(key, coercedValue);
+            }
+
+            return new MapRecord(recordSchema, coercedValues);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Record for field " + fieldName);
+    }
+
+    public static boolean isRecordTypeCompatible(final Object value) {
+        return value != null && value instanceof Record;
+    }
+
+    public static Object[] toArray(final Object value, final String fieldName) 
{
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Object[]) {
+            return (Object[]) value;
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Object Array for field " + 
fieldName);
+    }
+
+    public static boolean isArrayTypeCompatible(final Object value) {
+        return value != null && value instanceof Object[];
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Map<String, Object> toMap(final Object value, final String 
fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Map) {
+            final Map<?, ?> original = (Map<?, ?>) value;
+
+            boolean keysAreStrings = true;
+            for (final Object key : original.keySet()) {
+                if (!(key instanceof String)) {
+                    keysAreStrings = false;
+                }
+            }
+
+            if (keysAreStrings) {
+                return (Map<String, Object>) value;
+            }
+
+            final Map<String, Object> transformed = new HashMap<>();
+            for (final Map.Entry<?, ?> entry : original.entrySet()) {
+                final Object key = entry.getKey();
+                if (key == null) {
+                    transformed.put(null, entry.getValue());
+                } else {
+                    transformed.put(key.toString(), entry.getValue());
+                }
+            }
+
+            return transformed;
+        }
+
+        if (value instanceof Record) {
+            final Record record = (Record) value;
+            final RecordSchema recordSchema = record.getSchema();
+            if (recordSchema == null) {
+                throw new IllegalTypeConversionException("Cannot convert value 
[" + value + "] of type Record to Map for field " + fieldName
+                    + " because Record does not have an associated Schema");
+            }
+
+            final Map<String, Object> map = new HashMap<>();
+            for (final String recordFieldName : recordSchema.getFieldNames()) {
+                map.put(recordFieldName, record.getValue(recordFieldName));
+            }
+
+            return map;
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Map for field " + fieldName);
+    }
+
+    public static boolean isMapTypeCompatible(final Object value) {
+        return value != null && value instanceof Map;
+    }
+
+
+    public static String toString(final Object value, final String dateFormat, 
final String timeFormat, final String timestampFormat) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof String) {
+            return (String) value;
+        }
+
+        if (value instanceof java.sql.Date) {
+            return getDateFormat(dateFormat).format((java.util.Date) value);
+        }
+        if (value instanceof java.sql.Time) {
+            return getDateFormat(timeFormat).format((java.util.Date) value);
+        }
+        if (value instanceof java.sql.Timestamp) {
+            return getDateFormat(timestampFormat).format((java.util.Date) 
value);
+        }
+        if (value instanceof java.util.Date) {
+            return getDateFormat(timestampFormat).format((java.util.Date) 
value);
+        }
+
+        return value.toString();
+    }
+
+    public static boolean isStringTypeCompatible(final Object value) {
+        return value != null;
+    }
+
+    public static java.sql.Date toDate(final Object value, final String 
format, final String fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Date) {
+            return (Date) value;
+        }
+
+        if (value instanceof Number) {
+            final long longValue = ((Number) value).longValue();
+            return new Date(longValue);
+        }
+
+        if (value instanceof String) {
+            try {
+                final java.util.Date utilDate = 
getDateFormat(format).parse((String) value);
+                return new Date(utilDate.getTime());
+            } catch (final ParseException e) {
+                throw new IllegalTypeConversionException("Could not convert 
value [" + value
+                    + "] of type java.lang.String to Date because the value is 
not in the expected date format: " + format + " for field " + fieldName);
+            }
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Date for field " + fieldName);
+    }
+
+    public static boolean isDateTypeCompatible(final Object value, final 
String format) {
+        if (value == null) {
+            return false;
+        }
+
+        if (value instanceof java.util.Date || value instanceof Number) {
+            return true;
+        }
+
+        if (value instanceof String) {
+            try {
+                getDateFormat(format).parse((String) value);
+                return true;
+            } catch (final ParseException e) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+
+    public static Time toTime(final Object value, final String format, final 
String fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Time) {
+            return (Time) value;
+        }
+
+        if (value instanceof Number) {
+            final long longValue = ((Number) value).longValue();
+            return new Time(longValue);
+        }
+
+        if (value instanceof String) {
+            try {
+                final java.util.Date utilDate = 
getDateFormat(format).parse((String) value);
+                return new Time(utilDate.getTime());
+            } catch (final ParseException e) {
+                throw new IllegalTypeConversionException("Could not convert 
value [" + value
+                    + "] of type java.lang.String to Time for field " + 
fieldName + " because the value is not in the expected date format: " + format);
+            }
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Time for field " + fieldName);
+    }
+
+    private static DateFormat getDateFormat(final String format) {
+        final DateFormat df = new SimpleDateFormat(format);
+        df.setTimeZone(gmt);
+        return df;
+    }
+
+    public static boolean isTimeTypeCompatible(final Object value, final 
String format) {
+        return isDateTypeCompatible(value, format);
+    }
+
+    public static Timestamp toTimestamp(final Object value, final String 
format, final String fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Timestamp) {
+            return (Timestamp) value;
+        }
+
+        if (value instanceof Number) {
+            final long longValue = ((Number) value).longValue();
+            return new Timestamp(longValue);
+        }
+
+        if (value instanceof String) {
+            try {
+                final java.util.Date utilDate = 
getDateFormat(format).parse((String) value);
+                return new Timestamp(utilDate.getTime());
+            } catch (final ParseException e) {
+                throw new IllegalTypeConversionException("Could not convert 
value [" + value
+                    + "] of type java.lang.String to Timestamp for field " + 
fieldName + " because the value is not in the expected date format: " + format);
+            }
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Timestamp for field " + 
fieldName);
+    }
+
+    public static boolean isTimestampTypeCompatible(final Object value, final 
String format) {
+        return isDateTypeCompatible(value, format);
+    }
+
+
+    public static BigInteger toBigInt(final Object value, final String 
fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof BigInteger) {
+            return (BigInteger) value;
+        }
+        if (value instanceof Long) {
+            return BigInteger.valueOf((Long) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to BigInteger for field " + 
fieldName);
+    }
+
+    public static boolean isBigIntTypeCompatible(final Object value) {
+        return value == null && (value instanceof BigInteger || value 
instanceof Long);
+    }
+
+    public static Boolean toBoolean(final Object value, final String 
fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Boolean) {
+            return (Boolean) value;
+        }
+        if (value instanceof String) {
+            final String string = (String) value;
+            if (string.equalsIgnoreCase("true")) {
+                return Boolean.TRUE;
+            } else if (string.equalsIgnoreCase("false")) {
+                return Boolean.FALSE;
+            }
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Boolean for field " + fieldName);
+    }
+
+    public static boolean isBooleanTypeCompatible(final Object value) {
+        if (value == null) {
+            return false;
+        }
+        if (value instanceof Boolean) {
+            return true;
+        }
+        if (value instanceof String) {
+            final String string = (String) value;
+            return string.equalsIgnoreCase("true") || 
string.equalsIgnoreCase("false");
+        }
+        return false;
+    }
+
+    public static Double toDouble(final Object value, final String fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).doubleValue();
+        }
+
+        if (value instanceof String) {
+            return Double.parseDouble((String) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Double for field " + fieldName);
+    }
+
+    public static boolean isDoubleTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, s -> Double.parseDouble(s));
+    }
+
+    private static boolean isNumberTypeCompatible(final Object value, final 
Consumer<String> stringValueVerifier) {
+        if (value == null) {
+            return false;
+        }
+
+        if (value instanceof Number) {
+            return true;
+        }
+
+        if (value instanceof String) {
+            try {
+                stringValueVerifier.accept((String) value);
+                return true;
+            } catch (final NumberFormatException nfe) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+
+    public static Float toFloat(final Object value, final String fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).floatValue();
+        }
+
+        if (value instanceof String) {
+            return Float.parseFloat((String) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Float for field " + fieldName);
+    }
+
+    public static boolean isFloatTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, s -> Float.parseFloat(s));
+    }
+
+    public static Long toLong(final Object value, final String fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).longValue();
+        }
+
+        if (value instanceof String) {
+            return Long.parseLong((String) value);
+        }
+
+        if (value instanceof java.util.Date) {
+            return ((java.util.Date) value).getTime();
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Long for field " + fieldName);
+    }
+
+    public static boolean isLongTypeCompatible(final Object value) {
+        if (value == null) {
+            return false;
+        }
+
+        if (value instanceof Number) {
+            return true;
+        }
+
+        if (value instanceof java.util.Date) {
+            return true;
+        }
+
+        if (value instanceof String) {
+            try {
+                Long.parseLong((String) value);
+                return true;
+            } catch (final NumberFormatException nfe) {
+                return false;
+            }
+        }
+
+        return false;
+    }
+
+
+    public static Integer toInteger(final Object value, final String 
fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).intValue();
+        }
+
+        if (value instanceof String) {
+            return Integer.parseInt((String) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Integer for field " + fieldName);
+    }
+
+    public static boolean isIntegerTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, s -> Integer.parseInt(s));
+    }
+
+
+    public static Short toShort(final Object value, final String fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).shortValue();
+        }
+
+        if (value instanceof String) {
+            return Short.parseShort((String) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Short for field " + fieldName);
+    }
+
+    public static boolean isShortTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, s -> Short.parseShort(s));
+    }
+
+    public static Byte toByte(final Object value, final String fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Number) {
+            return ((Number) value).byteValue();
+        }
+
+        if (value instanceof String) {
+            return Byte.parseByte((String) value);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Byte for field " + fieldName);
+    }
+
+    public static boolean isByteTypeCompatible(final Object value) {
+        return isNumberTypeCompatible(value, s -> Byte.parseByte(s));
+    }
+
+
+    public static Character toCharacter(final Object value, final String 
fieldName) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof Character) {
+            return ((Character) value);
+        }
+
+        if (value instanceof CharSequence) {
+            final CharSequence charSeq = (CharSequence) value;
+            if (charSeq.length() == 0) {
+                throw new IllegalTypeConversionException("Cannot convert value 
[" + value + "] of type " + value.getClass()
+                    + " to Character because it has a length of 0 for field " 
+ fieldName);
+            }
+
+            return charSeq.charAt(0);
+        }
+
+        throw new IllegalTypeConversionException("Cannot convert value [" + 
value + "] of type " + value.getClass() + " to Character for field " + 
fieldName);
+    }
+
+    public static boolean isCharacterTypeCompatible(final Object value) {
+        return value != null && (value instanceof Character || (value 
instanceof CharSequence && ((CharSequence) value).length() > 0));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
new file mode 100644
index 0000000..38b5d20
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record.util;
+
+public class IllegalTypeConversionException extends RuntimeException {
+
+    public IllegalTypeConversionException(final String message) {
+        super(message);
+    }
+
+    public IllegalTypeConversionException(final String message, final 
Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
new file mode 100644
index 0000000..5a61275
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSimpleRecordSchema {
+
+    @Test
+    public void testPreventsTwoFieldsWithSameAlias() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("hello", 
RecordFieldType.STRING.getDataType(), null, set("foo", "bar")));
+        fields.add(new RecordField("goodbye", 
RecordFieldType.STRING.getDataType(), null, set("baz", "bar")));
+
+        try {
+            new SimpleRecordSchema(fields);
+            Assert.fail("Was able to create two fields with same alias");
+        } catch (final IllegalArgumentException expected) {
+        }
+    }
+
+    @Test
+    public void testPreventsTwoFieldsWithSameName() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("hello", 
RecordFieldType.STRING.getDataType(), null, set("foo", "bar")));
+        fields.add(new RecordField("hello", 
RecordFieldType.STRING.getDataType()));
+
+        try {
+            new SimpleRecordSchema(fields);
+            Assert.fail("Was able to create two fields with same name");
+        } catch (final IllegalArgumentException expected) {
+        }
+    }
+
+    @Test
+    public void testPreventsTwoFieldsWithConflictingNamesAliases() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("hello", 
RecordFieldType.STRING.getDataType(), null, set("foo", "bar")));
+        fields.add(new RecordField("bar", 
RecordFieldType.STRING.getDataType()));
+
+        try {
+            new SimpleRecordSchema(fields);
+            Assert.fail("Was able to create two fields with conflicting 
names/aliases");
+        } catch (final IllegalArgumentException expected) {
+        }
+    }
+
+    private Set<String> set(final String... values) {
+        final Set<String> set = new HashSet<>();
+        for (final String value : values) {
+            set.add(value);
+        }
+        return set;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
new file mode 100644
index 0000000..82e20a6
--- /dev/null
+++ 
b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.serialization.record;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestMapRecord {
+
+    @Test
+    public void testDefaultValue() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("noDefault", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("defaultOfHello", 
RecordFieldType.STRING.getDataType(), "hello"));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Map<String, Object> values = new HashMap<>();
+        final Record record = new MapRecord(schema, values);
+
+        assertNull(record.getValue("noDefault"));
+        assertEquals("hello", record.getValue("defaultOfHello"));
+    }
+
+    @Test
+    public void testDefaultValueInGivenField() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("noDefault", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("defaultOfHello", 
RecordFieldType.STRING.getDataType(), "hello"));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Map<String, Object> values = new HashMap<>();
+        final Record record = new MapRecord(schema, values);
+
+        assertNull(record.getValue("noDefault"));
+        assertEquals("hello", record.getValue("defaultOfHello"));
+
+        final RecordField newField = new RecordField("noDefault", 
RecordFieldType.STRING.getDataType(), "new");
+        assertEquals("new", record.getValue(newField));
+    }
+
+    @Test
+    public void testIllegalDefaultValue() {
+        new RecordField("hello", RecordFieldType.STRING.getDataType(), 84);
+        new RecordField("hello", RecordFieldType.STRING.getDataType(), 
(Object) null);
+        new RecordField("hello", RecordFieldType.INT.getDataType(), 84);
+        new RecordField("hello", RecordFieldType.INT.getDataType(), (Object) 
null);
+
+        try {
+            new RecordField("hello", RecordFieldType.INT.getDataType(), "foo");
+            Assert.fail("Was able to set a default value of \"foo\" for INT 
type");
+        } catch (final IllegalArgumentException expected) {
+            // expected
+        }
+    }
+
+    private Set<String> set(final String... values) {
+        final Set<String> set = new HashSet<>();
+        for (final String value : values) {
+            set.add(value);
+        }
+        return set;
+    }
+
+    @Test
+    public void testAliasOneValue() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("foo", 
RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Map<String, Object> values = new HashMap<>();
+        values.put("bar", 1);
+
+        final Record record = new MapRecord(schema, values);
+        assertEquals(1, record.getValue("foo"));
+        assertEquals(1, record.getValue("bar"));
+        assertEquals(1, record.getValue("baz"));
+    }
+
+    @Test
+    public void testAliasConflictingValues() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("foo", 
RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Map<String, Object> values = new HashMap<>();
+        values.put("bar", 1);
+        values.put("foo", null);
+
+        final Record record = new MapRecord(schema, values);
+        assertEquals(1, record.getValue("foo"));
+        assertEquals(1, record.getValue("bar"));
+        assertEquals(1, record.getValue("baz"));
+    }
+
+    @Test
+    public void testAliasConflictingAliasValues() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("foo", 
RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Map<String, Object> values = new HashMap<>();
+        values.put("baz", 1);
+        values.put("bar", 33);
+
+        final Record record = new MapRecord(schema, values);
+        assertEquals(33, record.getValue("foo"));
+        assertEquals(33, record.getValue("bar"));
+        assertEquals(33, record.getValue("baz"));
+    }
+
+    @Test
+    public void testAliasInGivenField() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("foo", 
RecordFieldType.STRING.getDataType(), null, set("bar", "baz")));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Map<String, Object> values = new HashMap<>();
+        values.put("bar", 33);
+
+        final Record record = new MapRecord(schema, values);
+        assertEquals(33, record.getValue("foo"));
+        assertEquals(33, record.getValue("bar"));
+        assertEquals(33, record.getValue("baz"));
+
+        final RecordField noAlias = new RecordField("hello", 
RecordFieldType.STRING.getDataType());
+        assertNull(record.getValue(noAlias));
+
+        final RecordField withAlias = new RecordField("hello", 
RecordFieldType.STRING.getDataType(), null, set("baz"));
+        assertEquals(33, record.getValue(withAlias));
+        assertEquals("33", record.getAsString(withAlias, 
withAlias.getDataType().getFormat()));
+    }
+
+
+    @Test
+    public void testDefaultValueWithAliasValue() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("foo", 
RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz")));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Map<String, Object> values = new HashMap<>();
+        values.put("baz", 1);
+        values.put("bar", 33);
+
+        final Record record = new MapRecord(schema, values);
+        assertEquals(33, record.getValue("foo"));
+        assertEquals(33, record.getValue("bar"));
+        assertEquals(33, record.getValue("baz"));
+    }
+
+    @Test
+    public void testDefaultValueWithAliasesDefined() {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("foo", 
RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz")));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+        final Map<String, Object> values = new HashMap<>();
+        final Record record = new MapRecord(schema, values);
+        assertEquals("hello", record.getValue("foo"));
+        assertEquals("hello", record.getValue("bar"));
+        assertEquals("hello", record.getValue("baz"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml
index d0eb6b6..17a0fc2 100644
--- a/nifi-commons/pom.xml
+++ b/nifi-commons/pom.xml
@@ -33,11 +33,10 @@
         <module>nifi-socket-utils</module>
         <module>nifi-utils</module>
         <module>nifi-web-utils</module>
-        <module>nifi-processor-utilities</module>
         <module>nifi-write-ahead-log</module>
         <module>nifi-site-to-site-client</module>
         <module>nifi-hl7-query-language</module>
-        <module>nifi-hadoop-utils</module>
         <module>nifi-schema-utils</module>
+       <module>nifi-record</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml
new file mode 100644
index 0000000..e376ba9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-extension-utils</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>nifi-hadoop-utils</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <!-- Other modules using nifi-hadoop-utils are expected to have the 
below dependencies available, typically through a NAR dependency -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/krb5.conf</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
new file mode 100644
index 0000000..af10079
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * All processors and controller services that need properties for Kerberos
+ * Principal and Keytab should obtain them through this class by calling:
+ *
+ * KerberosProperties props =
+ * KerberosProperties.create(NiFiProperties.getInstance())
+ *
+ * The properties can be accessed from the resulting KerberosProperties
+ * instance.
+ */
+public class KerberosProperties {
+
+    private final File kerberosConfigFile;
+    private final Validator kerberosConfigValidator;
+    private final PropertyDescriptor kerberosPrincipal;
+    private final PropertyDescriptor kerberosKeytab;
+
+    /**
+     * Instantiate a KerberosProperties object but keep in mind it is
+     * effectively a singleton because the krb5.conf file needs to be set as a
+     * system property which this constructor will take care of.
+     *
+     * @param kerberosConfigFile file of krb5.conf
+     */
+    public KerberosProperties(final File kerberosConfigFile) {
+        this.kerberosConfigFile = kerberosConfigFile;
+
+        this.kerberosConfigValidator = new Validator() {
+            @Override
+            public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+                // Check that the Kerberos configuration is set
+                if (kerberosConfigFile == null) {
+                    return new ValidationResult.Builder()
+                            .subject(subject).input(input).valid(false)
+                            .explanation("you are missing the 
nifi.kerberos.krb5.file property which "
+                                    + "must be set in order to use Kerberos")
+                            .build();
+                }
+
+                // Check that the Kerberos configuration is readable
+                if (!kerberosConfigFile.canRead()) {
+                    return new 
ValidationResult.Builder().subject(subject).input(input).valid(false)
+                            .explanation(String.format("unable to read 
Kerberos config [%s], please make sure the path is valid "
+                                    + "and nifi has adequate permissions", 
kerberosConfigFile.getAbsoluteFile()))
+                            .build();
+                }
+
+                return new 
ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+            }
+        };
+
+        this.kerberosPrincipal = new PropertyDescriptor.Builder()
+                .name("Kerberos Principal")
+                .required(false)
+                .description("Kerberos principal to authenticate as. Requires 
nifi.kerberos.krb5.file to be set in your nifi.properties")
+                .addValidator(kerberosConfigValidator)
+                .build();
+
+        this.kerberosKeytab = new PropertyDescriptor.Builder()
+                .name("Kerberos Keytab").required(false)
+                .description("Kerberos keytab associated with the principal. 
Requires nifi.kerberos.krb5.file to be set in your nifi.properties")
+                .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+                .addValidator(kerberosConfigValidator)
+                .build();
+    }
+
+    public File getKerberosConfigFile() {
+        return kerberosConfigFile;
+    }
+
+    public Validator getKerberosConfigValidator() {
+        return kerberosConfigValidator;
+    }
+
+    public PropertyDescriptor getKerberosPrincipal() {
+        return kerberosPrincipal;
+    }
+
+    public PropertyDescriptor getKerberosKeytab() {
+        return kerberosKeytab;
+    }
+
+    public static List<ValidationResult> validatePrincipalAndKeytab(final 
String subject, final Configuration config, final String principal, final 
String keytab, final ComponentLog logger) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        // if security is enabled then the keytab and principal are required
+        final boolean isSecurityEnabled = 
SecurityUtil.isSecurityEnabled(config);
+
+        final boolean blankPrincipal = (principal == null || 
principal.isEmpty());
+        if (isSecurityEnabled && blankPrincipal) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(subject)
+                    .explanation("Kerberos Principal must be provided when 
using a secure configuration")
+                    .build());
+        }
+
+        final boolean blankKeytab = (keytab == null || keytab.isEmpty());
+        if (isSecurityEnabled && blankKeytab) {
+            results.add(new ValidationResult.Builder()
+                    .valid(false)
+                    .subject(subject)
+                    .explanation("Kerberos Keytab must be provided when using 
a secure configuration")
+                    .build());
+        }
+
+        if (!isSecurityEnabled && (!blankPrincipal || !blankKeytab)) {
+            logger.warn("Configuration does not have security enabled, Keytab 
and Principal will be ignored");
+        }
+
+        return results;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
new file mode 100644
index 0000000..bf922fe
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+/**
+ * Periodically attempts to renew the Kerberos user's ticket for the given UGI.
+ *
+ * This class will attempt to call ugi.checkTGTAndReloginFromKeytab() which
+ * will re-login the user if the ticket expired or is close to expiry. Between
+ * relogin attempts this thread will sleep for the provided amount of time.
+ *
+ */
+public class KerberosTicketRenewer implements Runnable {
+
+    private final UserGroupInformation ugi;
+    private final long renewalPeriod;
+    private final ComponentLog logger;
+
+    private volatile boolean stopped = false;
+
+    /**
+     * @param ugi
+     *          the user to renew the ticket for
+     * @param renewalPeriod
+     *          the amount of time in milliseconds to wait between renewal 
attempts
+     * @param logger
+     *          the logger from the component that started the renewer
+     */
+    public KerberosTicketRenewer(final UserGroupInformation ugi, final long 
renewalPeriod, final ComponentLog logger) {
+        this.ugi = ugi;
+        this.renewalPeriod = renewalPeriod;
+        this.logger = logger;
+    }
+
+    @Override
+    public void run() {
+        stopped = false;
+        while (!stopped) {
+            try {
+                logger.debug("Invoking renewal attempt for Kerberos ticket");
+                // While we run this "frequently", the Hadoop implementation 
will only perform the login at 80% of ticket lifetime.
+                ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+                    ugi.checkTGTAndReloginFromKeytab();
+                    return null;
+                });
+            } catch (IOException e) {
+                logger.error("Failed to renew Kerberos ticket", e);
+            } catch (InterruptedException e) {
+                logger.error("Interrupted while attempting to renew Kerberos 
ticket", e);
+                Thread.currentThread().interrupt();
+                return;
+            }
+
+            logger.debug("current UGI {}", new Object[]{ugi});
+
+            // Wait for a bit before checking again.
+            try {
+                Thread.sleep(renewalPeriod);
+            } catch (InterruptedException e) {
+                logger.error("Renewal thread interrupted", e);
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+    }
+
+    public void stop() {
+        stopped = true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
new file mode 100644
index 0000000..fcb9032
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.hadoop;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.logging.ComponentLog;
+
+import java.io.IOException;
+
+/**
+ * Provides synchronized access to UserGroupInformation to avoid multiple 
processors/services from
+ * interfering with each other.
+ */
+public class SecurityUtil {
+    public static final String HADOOP_SECURITY_AUTHENTICATION = 
"hadoop.security.authentication";
+    public static final String KERBEROS = "kerberos";
+
+    /**
+     * Initializes UserGroupInformation with the given Configuration and 
performs the login for the given principal
+     * and keytab. All logins should happen through this class to ensure other 
threads are not concurrently modifying
+     * UserGroupInformation.
+     *
+     * @param config the configuration instance
+     * @param principal the principal to authenticate as
+     * @param keyTab the keytab to authenticate with
+     *
+     * @return the UGI for the given principal
+     *
+     * @throws IOException if login failed
+     */
+    public static synchronized UserGroupInformation loginKerberos(final 
Configuration config, final String principal, final String keyTab)
+            throws IOException {
+        Validate.notNull(config);
+        Validate.notNull(principal);
+        Validate.notNull(keyTab);
+
+        UserGroupInformation.setConfiguration(config);
+        return 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), 
keyTab.trim());
+    }
+
+    /**
+     * Initializes UserGroupInformation with the given Configuration and 
returns UserGroupInformation.getLoginUser().
+     * All logins should happen through this class to ensure other threads are 
not concurrently modifying
+     * UserGroupInformation.
+     *
+     * @param config the configuration instance
+     *
+     * @return the UGI for the given principal
+     *
+     * @throws IOException if login failed
+     */
+    public static synchronized UserGroupInformation loginSimple(final 
Configuration config) throws IOException {
+        Validate.notNull(config);
+        UserGroupInformation.setConfiguration(config);
+        return UserGroupInformation.getLoginUser();
+    }
+
+    /**
+     * Initializes UserGroupInformation with the given Configuration and 
returns UserGroupInformation.isSecurityEnabled().
+     *
+     * All checks for isSecurityEnabled() should happen through this method.
+     *
+     * @param config the given configuration
+     *
+     * @return true if kerberos is enabled on the given configuration, false 
otherwise
+     *
+     */
+    public static boolean isSecurityEnabled(final Configuration config) {
+        Validate.notNull(config);
+        return 
KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
+    }
+
+    /**
+     * Start a thread that periodically attempts to renew the current Kerberos 
user's ticket.
+     *
+     * Callers of this method should store the reference to the 
KerberosTicketRenewer and call stop() to stop the thread.
+     *
+     * @param id
+     *          The unique identifier to use for the thread, can be the class 
name that started the thread
+     *              (i.e. PutHDFS, etc)
+     * @param ugi
+     *          The current Kerberos user.
+     * @param renewalPeriod
+     *          The amount of time between attempting renewals.
+     * @param logger
+     *          The logger to use with in the renewer
+     *
+     * @return the KerberosTicketRenewer Runnable
+     */
+    public static KerberosTicketRenewer startTicketRenewalThread(final String 
id, final UserGroupInformation ugi, final long renewalPeriod, final 
ComponentLog logger) {
+        final KerberosTicketRenewer renewer = new KerberosTicketRenewer(ugi, 
renewalPeriod, logger);
+
+        final Thread t = new Thread(renewer);
+        t.setName("Kerberos Ticket Renewal [" + id + "]");
+        t.start();
+
+        return renewer;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
new file mode 100644
index 0000000..0a8ed12
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.hadoop.SecurityUtil;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import javax.net.SocketFactory;
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This is a base class that is helpful when building processors interacting 
with HDFS.
+ */
+@RequiresInstanceClassLoading(cloneAncestorResources = true)
+public abstract class AbstractHadoopProcessor extends AbstractProcessor {
+
+    // properties
+    public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = 
new PropertyDescriptor.Builder()
+            .name("Hadoop Configuration Resources")
+            .description("A file or comma separated list of files which 
contains the Hadoop file system configuration. Without this, Hadoop "
+                    + "will search the classpath for a 'core-site.xml' and 
'hdfs-site.xml' file or will revert to a default configuration.")
+            .required(false)
+            .addValidator(HadoopValidators.ONE_OR_MORE_FILE_EXISTS_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor DIRECTORY = new 
PropertyDescriptor.Builder()
+            .name("Directory")
+            .description("The HDFS directory from which files should be read")
+            .required(true)
+            
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor COMPRESSION_CODEC = new 
PropertyDescriptor.Builder()
+            .name("Compression codec")
+            .required(true)
+            .allowableValues(CompressionType.values())
+            .defaultValue(CompressionType.NONE.toString())
+            .build();
+
+    public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new 
PropertyDescriptor.Builder()
+            .name("Kerberos Relogin Period").required(false)
+            .description("Period of time which should pass before attempting a 
kerberos relogin")
+            .defaultValue("4 hours")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = 
new PropertyDescriptor.Builder()
+            .name("Additional Classpath Resources")
+            .description("A comma-separated list of paths to files and/or 
directories that will be added to the classpath. When specifying a " +
+                    "directory, all files with in the directory will be added 
to the classpath, but further sub-directories will not be included.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .dynamicallyModifiesClasspath(true)
+            .build();
+
+    public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = 
"absolute.hdfs.path";
+
+    private static final Object RESOURCES_LOCK = new Object();
+
+    private long kerberosReloginThreshold;
+    private long lastKerberosReloginTime;
+    protected KerberosProperties kerberosProperties;
+    protected List<PropertyDescriptor> properties;
+    private volatile File kerberosConfigFile = null;
+
+    // variables shared by all threads of this processor
+    // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
+    private final AtomicReference<HdfsResources> hdfsResources = new 
AtomicReference<>();
+
+    // Holder of cached Configuration information so validation does not 
reload the same config over and over
+    private final AtomicReference<ValidationResources> 
validationResourceHolder = new AtomicReference<>();
+
+    @Override
+    protected void init(ProcessorInitializationContext context) {
+        hdfsResources.set(new HdfsResources(null, null, null));
+
+        kerberosConfigFile = context.getKerberosConfigurationFile();
+        kerberosProperties = getKerberosProperties(kerberosConfigFile);
+
+        List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(HADOOP_CONFIGURATION_RESOURCES);
+        props.add(kerberosProperties.getKerberosPrincipal());
+        props.add(kerberosProperties.getKerberosKeytab());
+        props.add(KERBEROS_RELOGIN_PERIOD);
+        props.add(ADDITIONAL_CLASSPATH_RESOURCES);
+        properties = Collections.unmodifiableList(props);
+    }
+
+    protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
+        return new KerberosProperties(kerberosConfigFile);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
validationContext) {
+        final String configResources = 
validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
+        final String principal = 
validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
+        final String keytab = 
validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
+
+        final List<ValidationResult> results = new ArrayList<>();
+
+        if (!StringUtils.isBlank(configResources)) {
+            try {
+                ValidationResources resources = validationResourceHolder.get();
+
+                // if no resources in the holder, or if the holder has 
different resources loaded,
+                // then load the Configuration and set the new resources in 
the holder
+                if (resources == null || 
!configResources.equals(resources.getConfigResources())) {
+                    getLogger().debug("Reloading validation resources");
+                    final Configuration config = new 
ExtendedConfiguration(getLogger());
+                    
config.setClassLoader(Thread.currentThread().getContextClassLoader());
+                    resources = new ValidationResources(configResources, 
getConfigurationFromResources(config, configResources));
+                    validationResourceHolder.set(resources);
+                }
+
+                final Configuration conf = resources.getConfiguration();
+                results.addAll(KerberosProperties.validatePrincipalAndKeytab(
+                        this.getClass().getSimpleName(), conf, principal, 
keytab, getLogger()));
+
+            } catch (IOException e) {
+                results.add(new ValidationResult.Builder()
+                        .valid(false)
+                        .subject(this.getClass().getSimpleName())
+                        .explanation("Could not load Hadoop Configuration 
resources")
+                        .build());
+            }
+        }
+
+        return results;
+    }
+
+    /*
+     * If your subclass also has an @OnScheduled annotated method and you need 
hdfsResources in that method, then be sure to call 
super.abstractOnScheduled(context)
+     */
+    @OnScheduled
+    public final void abstractOnScheduled(ProcessContext context) throws 
IOException {
+        try {
+            // This value will be null when called from ListHDFS, because it 
overrides all of the default
+            // properties this processor sets. TODO: re-work ListHDFS to 
utilize Kerberos
+            if (context.getProperty(KERBEROS_RELOGIN_PERIOD).getValue() != 
null) {
+                kerberosReloginThreshold = 
context.getProperty(KERBEROS_RELOGIN_PERIOD).asTimePeriod(TimeUnit.SECONDS);
+            }
+            HdfsResources resources = hdfsResources.get();
+            if (resources.getConfiguration() == null) {
+                final String configResources = 
context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue();
+                resources = resetHDFSResources(configResources, context);
+                hdfsResources.set(resources);
+            }
+        } catch (IOException ex) {
+            getLogger().error("HDFS Configuration error - {}", new Object[] { 
ex });
+            hdfsResources.set(new HdfsResources(null, null, null));
+            throw ex;
+        }
+    }
+
+    @OnStopped
+    public final void abstractOnStopped() {
+        hdfsResources.set(new HdfsResources(null, null, null));
+    }
+
+    private static Configuration getConfigurationFromResources(final 
Configuration config, String configResources) throws IOException {
+        boolean foundResources = false;
+        if (null != configResources) {
+            String[] resources = configResources.split(",");
+            for (String resource : resources) {
+                config.addResource(new Path(resource.trim()));
+                foundResources = true;
+            }
+        }
+
+        if (!foundResources) {
+            // check that at least 1 non-default resource is available on the 
classpath
+            String configStr = config.toString();
+            for (String resource : configStr.substring(configStr.indexOf(":") 
+ 1).split(",")) {
+                if (!resource.contains("default") && 
config.getResource(resource.trim()) != null) {
+                    foundResources = true;
+                    break;
+                }
+            }
+        }
+
+        if (!foundResources) {
+            throw new IOException("Could not find any of the " + 
HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath");
+        }
+        return config;
+    }
+
+    /*
+     * Reset Hadoop Configuration and FileSystem based on the supplied 
configuration resources.
+     */
+    HdfsResources resetHDFSResources(String configResources, ProcessContext 
context) throws IOException {
+        Configuration config = new ExtendedConfiguration(getLogger());
+        config.setClassLoader(Thread.currentThread().getContextClassLoader());
+
+        getConfigurationFromResources(config, configResources);
+
+        // first check for timeout on HDFS connection, because FileSystem has 
a hard coded 15 minute timeout
+        checkHdfsUriForTimeout(config);
+
+        // disable caching of Configuration and FileSystem objects, else we 
cannot reconfigure the processor without a complete
+        // restart
+        String disableCacheName = String.format("fs.%s.impl.disable.cache", 
FileSystem.getDefaultUri(config).getScheme());
+        config.set(disableCacheName, "true");
+
+        // If kerberos is enabled, create the file system as the kerberos 
principal
+        // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed 
by only a single thread at at time
+        FileSystem fs;
+        UserGroupInformation ugi;
+        synchronized (RESOURCES_LOCK) {
+            if (SecurityUtil.isSecurityEnabled(config)) {
+                String principal = 
context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
+                String keyTab = 
context.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
+                ugi = SecurityUtil.loginKerberos(config, principal, keyTab);
+                fs = getFileSystemAsUser(config, ugi);
+                lastKerberosReloginTime = System.currentTimeMillis() / 1000;
+            } else {
+                config.set("ipc.client.fallback-to-simple-auth-allowed", 
"true");
+                config.set("hadoop.security.authentication", "simple");
+                ugi = SecurityUtil.loginSimple(config);
+                fs = getFileSystemAsUser(config, ugi);
+            }
+        }
+        getLogger().debug("resetHDFSResources UGI {}", new Object[]{ugi});
+
+        final Path workingDir = fs.getWorkingDirectory();
+        getLogger().info("Initialized a new HDFS File System with working dir: 
{} default block size: {} default replication: {} config: {}",
+                new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), 
fs.getDefaultReplication(workingDir), config.toString()});
+
+        return new HdfsResources(config, fs, ugi);
+    }
+
+    /**
+     * This exists in order to allow unit tests to override it so that they 
don't take several minutes waiting for UDP packets to be received
+     *
+     * @param config
+     *            the configuration to use
+     * @return the FileSystem that is created for the given Configuration
+     * @throws IOException
+     *             if unable to create the FileSystem
+     */
+    protected FileSystem getFileSystem(final Configuration config) throws 
IOException {
+        return FileSystem.get(config);
+    }
+
+    protected FileSystem getFileSystemAsUser(final Configuration config, 
UserGroupInformation ugi) throws IOException {
+        try {
+            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+                @Override
+                public FileSystem run() throws Exception {
+                    return FileSystem.get(config);
+                }
+            });
+        } catch (InterruptedException e) {
+            throw new IOException("Unable to create file system: " + 
e.getMessage());
+        }
+    }
+
+    /*
+     * Drastically reduce the timeout of a socket connection from the default 
in FileSystem.get()
+     */
+    protected void checkHdfsUriForTimeout(Configuration config) throws 
IOException {
+        URI hdfsUri = FileSystem.getDefaultUri(config);
+        String address = hdfsUri.getAuthority();
+        int port = hdfsUri.getPort();
+        if (address == null || address.isEmpty() || port < 0) {
+            return;
+        }
+        InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
+        SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
+        Socket socket = null;
+        try {
+            socket = socketFactory.createSocket();
+            NetUtils.connect(socket, namenode, 1000); // 1 second timeout
+        } finally {
+            IOUtils.closeQuietly(socket);
+        }
+    }
+
+    /**
+     * Returns the configured CompressionCodec, or null if none is configured.
+     *
+     * @param context
+     *            the ProcessContext
+     * @param configuration
+     *            the Hadoop Configuration
+     * @return CompressionCodec or null
+     */
+    protected org.apache.hadoop.io.compress.CompressionCodec 
getCompressionCodec(ProcessContext context, Configuration configuration) {
+        org.apache.hadoop.io.compress.CompressionCodec codec = null;
+        if (context.getProperty(COMPRESSION_CODEC).isSet()) {
+            String compressionClassname = 
CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString();
+            CompressionCodecFactory ccf = new 
CompressionCodecFactory(configuration);
+            codec = ccf.getCodecByClassName(compressionClassname);
+        }
+
+        return codec;
+    }
+
+    /**
+     * Returns the relative path of the child that does not include the 
filename or the root path.
+     *
+     * @param root
+     *            the path to relativize from
+     * @param child
+     *            the path to relativize
+     * @return the relative path
+     */
+    public static String getPathDifference(final Path root, final Path child) {
+        final int depthDiff = child.depth() - root.depth();
+        if (depthDiff <= 1) {
+            return "".intern();
+        }
+        String lastRoot = root.getName();
+        Path childsParent = child.getParent();
+        final StringBuilder builder = new StringBuilder();
+        builder.append(childsParent.getName());
+        for (int i = (depthDiff - 3); i >= 0; i--) {
+            childsParent = childsParent.getParent();
+            String name = childsParent.getName();
+            if (name.equals(lastRoot) && 
childsParent.toString().endsWith(root.toString())) {
+                break;
+            }
+            builder.insert(0, Path.SEPARATOR).insert(0, name);
+        }
+        return builder.toString();
+    }
+
+    protected Configuration getConfiguration() {
+        return hdfsResources.get().getConfiguration();
+    }
+
+    protected FileSystem getFileSystem() {
+        // trigger Relogin if necessary
+        getUserGroupInformation();
+        return hdfsResources.get().getFileSystem();
+    }
+
+    protected UserGroupInformation getUserGroupInformation() {
+        // if kerberos is enabled, check if the ticket should be renewed 
before returning
+        UserGroupInformation userGroupInformation = 
hdfsResources.get().getUserGroupInformation();
+        if (userGroupInformation != null && isTicketOld()) {
+            tryKerberosRelogin(userGroupInformation);
+        }
+        return userGroupInformation;
+    }
+
+    protected void tryKerberosRelogin(UserGroupInformation ugi) {
+        try {
+            getLogger().info("Kerberos ticket age exceeds threshold [{} 
seconds] " +
+                "attempting to renew ticket for user {}", new Object[]{
+              kerberosReloginThreshold, ugi.getUserName()});
+            ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+                ugi.checkTGTAndReloginFromKeytab();
+                return null;
+            });
+            lastKerberosReloginTime = System.currentTimeMillis() / 1000;
+            getLogger().info("Kerberos relogin successful or ticket still 
valid");
+        } catch (IOException e) {
+            // Most likely case of this happening is ticket is expired and 
error getting a new one,
+            // meaning dfs operations would fail
+            getLogger().error("Kerberos relogin failed", e);
+            throw new ProcessException("Unable to renew kerberos ticket", e);
+        } catch (InterruptedException e) {
+            getLogger().error("Interrupted while attempting Kerberos relogin", 
e);
+            throw new ProcessException("Unable to renew kerberos ticket", e);
+        }
+    }
+
+    protected boolean isTicketOld() {
+        return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > 
kerberosReloginThreshold;
+    }
+
+    static protected class HdfsResources {
+        private final Configuration configuration;
+        private final FileSystem fileSystem;
+        private final UserGroupInformation userGroupInformation;
+
+        public HdfsResources(Configuration configuration, FileSystem 
fileSystem, UserGroupInformation userGroupInformation) {
+            this.configuration = configuration;
+            this.fileSystem = fileSystem;
+            this.userGroupInformation = userGroupInformation;
+        }
+
+        public Configuration getConfiguration() {
+            return configuration;
+        }
+
+        public FileSystem getFileSystem() {
+            return fileSystem;
+        }
+
+        public UserGroupInformation getUserGroupInformation() {
+            return userGroupInformation;
+        }
+    }
+
+    static protected class ValidationResources {
+        private final String configResources;
+        private final Configuration configuration;
+
+        public ValidationResources(String configResources, Configuration 
configuration) {
+            this.configResources = configResources;
+            this.configuration = configuration;
+        }
+
+        public String getConfigResources() {
+            return configResources;
+        }
+
+        public Configuration getConfiguration() {
+            return configuration;
+        }
+    }
+
+    /**
+     * Extending Hadoop Configuration to prevent it from caching classes that 
can't be found. Since users may be
+     * adding additional JARs to the classpath we don't want them to have to 
restart the JVM to be able to load
+     * something that was previously not found, but might now be available.
+     *
+     * Reference the original getClassByNameOrNull from Configuration.
+     */
+    static class ExtendedConfiguration extends Configuration {
+
+        private final ComponentLog logger;
+        private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> 
CACHE_CLASSES = new WeakHashMap<>();
+
+        public ExtendedConfiguration(final ComponentLog logger) {
+            this.logger = logger;
+        }
+
+        public Class<?> getClassByNameOrNull(String name) {
+            final ClassLoader classLoader = getClassLoader();
+
+            Map<String, WeakReference<Class<?>>> map;
+            synchronized (CACHE_CLASSES) {
+                map = CACHE_CLASSES.get(classLoader);
+                if (map == null) {
+                    map = Collections.synchronizedMap(new WeakHashMap<>());
+                    CACHE_CLASSES.put(classLoader, map);
+                }
+            }
+
+            Class<?> clazz = null;
+            WeakReference<Class<?>> ref = map.get(name);
+            if (ref != null) {
+                clazz = ref.get();
+            }
+
+            if (clazz == null) {
+                try {
+                    clazz = Class.forName(name, true, classLoader);
+                } catch (ClassNotFoundException e) {
+                    logger.error(e.getMessage(), e);
+                    return null;
+                }
+                // two putters can race here, but they'll put the same class
+                map.put(name, new WeakReference<>(clazz));
+                return clazz;
+            } else {
+                // cache hit
+                return clazz;
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/CompressionType.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/CompressionType.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/CompressionType.java
new file mode 100644
index 0000000..43e6e80
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/CompressionType.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+
+/**
+ * Compression Type Enum for Hadoop related processors.
+ */
+public enum CompressionType {
+    NONE,
+    DEFAULT,
+    BZIP,
+    GZIP,
+    LZ4,
+    SNAPPY,
+    AUTOMATIC;
+
+    @Override
+    public String toString() {
+        switch (this) {
+            case NONE: return "NONE";
+            case DEFAULT: return DefaultCodec.class.getName();
+            case BZIP: return BZip2Codec.class.getName();
+            case GZIP: return GzipCodec.class.getName();
+            case LZ4: return Lz4Codec.class.getName();
+            case SNAPPY: return SnappyCodec.class.getName();
+            case AUTOMATIC: return "Automatically Detected";
+        }
+        return null;
+    }
+
+}

Reply via email to