http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java new file mode 100644 index 0000000..a81e843 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -0,0 +1,670 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.util; + +import org.apache.nifi.serialization.record.type.ChoiceDataType; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.math.BigInteger; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; +import java.util.function.Consumer; + +public class DataTypeUtils { + + private static final TimeZone gmt = TimeZone.getTimeZone("gmt"); + + public static Object convertType(final Object value, final DataType dataType, final String fieldName) { + return convertType(value, dataType, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), fieldName); + } + + public static Object convertType(final Object value, final DataType dataType, final String dateFormat, final String timeFormat, final String timestampFormat, final String fieldName) { + switch (dataType.getFieldType()) { + case BIGINT: + return toBigInt(value, fieldName); + case BOOLEAN: + return toBoolean(value, fieldName); + case BYTE: + return toByte(value, fieldName); + case CHAR: + return toCharacter(value, fieldName); + case DATE: + return toDate(value, dateFormat, fieldName); + case DOUBLE: + return toDouble(value, fieldName); + case FLOAT: + return toFloat(value, fieldName); + case INT: + return toInteger(value, fieldName); + case LONG: + return toLong(value, fieldName); + case SHORT: + return toShort(value, fieldName); + case STRING: + return toString(value, dateFormat, timeFormat, timestampFormat); + case TIME: + return toTime(value, timeFormat, fieldName); + case TIMESTAMP: + return toTimestamp(value, timestampFormat, fieldName); + case ARRAY: + return toArray(value, fieldName); + case MAP: + return toMap(value, fieldName); + case RECORD: + final RecordDataType recordType = (RecordDataType) dataType; + final RecordSchema childSchema = recordType.getChildSchema(); + return toRecord(value, childSchema, fieldName); + case CHOICE: { + if (value == null) { + return null; + } + + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + final DataType chosenDataType = chooseDataType(value, choiceDataType); + if (chosenDataType == null) { + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + + " for field " + fieldName + " to any of the following available Sub-Types for a Choice: " + choiceDataType.getPossibleSubTypes()); + } + + return convertType(value, chosenDataType, fieldName); + } + } + + return null; + } + + + public static boolean isCompatibleDataType(final Object value, final DataType dataType) { + switch (dataType.getFieldType()) { + case ARRAY: + return isArrayTypeCompatible(value); + case BIGINT: + return isBigIntTypeCompatible(value); + case BOOLEAN: + return isBooleanTypeCompatible(value); + case BYTE: + return isByteTypeCompatible(value); + case CHAR: + return isCharacterTypeCompatible(value); + case DATE: + return isDateTypeCompatible(value, dataType.getFormat()); + case DOUBLE: + return isDoubleTypeCompatible(value); + case FLOAT: + return isFloatTypeCompatible(value); + case INT: + return isIntegerTypeCompatible(value); + case LONG: + return isLongTypeCompatible(value); + case RECORD: + return isRecordTypeCompatible(value); + case SHORT: + return isShortTypeCompatible(value); + case TIME: + return isTimeTypeCompatible(value, dataType.getFormat()); + case TIMESTAMP: + return isTimestampTypeCompatible(value, dataType.getFormat()); + case STRING: + return isStringTypeCompatible(value); + case MAP: + return isMapTypeCompatible(value); + case CHOICE: { + final DataType chosenDataType = chooseDataType(value, (ChoiceDataType) dataType); + return chosenDataType != null; + } + } + + return false; + } + + public static DataType chooseDataType(final Object value, final ChoiceDataType choiceType) { + for (final DataType subType : choiceType.getPossibleSubTypes()) { + if (isCompatibleDataType(value, subType)) { + return subType; + } + } + + return null; + } + + public static Record toRecord(final Object value, final RecordSchema recordSchema, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Record) { + return ((Record) value); + } + + if (value instanceof Map) { + if (recordSchema == null) { + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + + " to Record for field " + fieldName + " because the value is a Map but no Record Schema was provided"); + } + + final Map<?, ?> map = (Map<?, ?>) value; + final Map<String, Object> coercedValues = new HashMap<>(); + + for (final Map.Entry<?, ?> entry : map.entrySet()) { + final Object keyValue = entry.getKey(); + if (keyValue == null) { + continue; + } + + final String key = keyValue.toString(); + final Optional<DataType> desiredTypeOption = recordSchema.getDataType(key); + if (!desiredTypeOption.isPresent()) { + continue; + } + + final Object rawValue = entry.getValue(); + final Object coercedValue = convertType(rawValue, desiredTypeOption.get(), fieldName); + coercedValues.put(key, coercedValue); + } + + return new MapRecord(recordSchema, coercedValues); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Record for field " + fieldName); + } + + public static boolean isRecordTypeCompatible(final Object value) { + return value != null && value instanceof Record; + } + + public static Object[] toArray(final Object value, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Object[]) { + return (Object[]) value; + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Object Array for field " + fieldName); + } + + public static boolean isArrayTypeCompatible(final Object value) { + return value != null && value instanceof Object[]; + } + + @SuppressWarnings("unchecked") + public static Map<String, Object> toMap(final Object value, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Map) { + final Map<?, ?> original = (Map<?, ?>) value; + + boolean keysAreStrings = true; + for (final Object key : original.keySet()) { + if (!(key instanceof String)) { + keysAreStrings = false; + } + } + + if (keysAreStrings) { + return (Map<String, Object>) value; + } + + final Map<String, Object> transformed = new HashMap<>(); + for (final Map.Entry<?, ?> entry : original.entrySet()) { + final Object key = entry.getKey(); + if (key == null) { + transformed.put(null, entry.getValue()); + } else { + transformed.put(key.toString(), entry.getValue()); + } + } + + return transformed; + } + + if (value instanceof Record) { + final Record record = (Record) value; + final RecordSchema recordSchema = record.getSchema(); + if (recordSchema == null) { + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type Record to Map for field " + fieldName + + " because Record does not have an associated Schema"); + } + + final Map<String, Object> map = new HashMap<>(); + for (final String recordFieldName : recordSchema.getFieldNames()) { + map.put(recordFieldName, record.getValue(recordFieldName)); + } + + return map; + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Map for field " + fieldName); + } + + public static boolean isMapTypeCompatible(final Object value) { + return value != null && value instanceof Map; + } + + + public static String toString(final Object value, final String dateFormat, final String timeFormat, final String timestampFormat) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return (String) value; + } + + if (value instanceof java.sql.Date) { + return getDateFormat(dateFormat).format((java.util.Date) value); + } + if (value instanceof java.sql.Time) { + return getDateFormat(timeFormat).format((java.util.Date) value); + } + if (value instanceof java.sql.Timestamp) { + return getDateFormat(timestampFormat).format((java.util.Date) value); + } + if (value instanceof java.util.Date) { + return getDateFormat(timestampFormat).format((java.util.Date) value); + } + + return value.toString(); + } + + public static boolean isStringTypeCompatible(final Object value) { + return value != null; + } + + public static java.sql.Date toDate(final Object value, final String format, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Date) { + return (Date) value; + } + + if (value instanceof Number) { + final long longValue = ((Number) value).longValue(); + return new Date(longValue); + } + + if (value instanceof String) { + try { + final java.util.Date utilDate = getDateFormat(format).parse((String) value); + return new Date(utilDate.getTime()); + } catch (final ParseException e) { + throw new IllegalTypeConversionException("Could not convert value [" + value + + "] of type java.lang.String to Date because the value is not in the expected date format: " + format + " for field " + fieldName); + } + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Date for field " + fieldName); + } + + public static boolean isDateTypeCompatible(final Object value, final String format) { + if (value == null) { + return false; + } + + if (value instanceof java.util.Date || value instanceof Number) { + return true; + } + + if (value instanceof String) { + try { + getDateFormat(format).parse((String) value); + return true; + } catch (final ParseException e) { + return false; + } + } + + return false; + } + + public static Time toTime(final Object value, final String format, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Time) { + return (Time) value; + } + + if (value instanceof Number) { + final long longValue = ((Number) value).longValue(); + return new Time(longValue); + } + + if (value instanceof String) { + try { + final java.util.Date utilDate = getDateFormat(format).parse((String) value); + return new Time(utilDate.getTime()); + } catch (final ParseException e) { + throw new IllegalTypeConversionException("Could not convert value [" + value + + "] of type java.lang.String to Time for field " + fieldName + " because the value is not in the expected date format: " + format); + } + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Time for field " + fieldName); + } + + private static DateFormat getDateFormat(final String format) { + final DateFormat df = new SimpleDateFormat(format); + df.setTimeZone(gmt); + return df; + } + + public static boolean isTimeTypeCompatible(final Object value, final String format) { + return isDateTypeCompatible(value, format); + } + + public static Timestamp toTimestamp(final Object value, final String format, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Timestamp) { + return (Timestamp) value; + } + + if (value instanceof Number) { + final long longValue = ((Number) value).longValue(); + return new Timestamp(longValue); + } + + if (value instanceof String) { + try { + final java.util.Date utilDate = getDateFormat(format).parse((String) value); + return new Timestamp(utilDate.getTime()); + } catch (final ParseException e) { + throw new IllegalTypeConversionException("Could not convert value [" + value + + "] of type java.lang.String to Timestamp for field " + fieldName + " because the value is not in the expected date format: " + format); + } + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Timestamp for field " + fieldName); + } + + public static boolean isTimestampTypeCompatible(final Object value, final String format) { + return isDateTypeCompatible(value, format); + } + + + public static BigInteger toBigInt(final Object value, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof BigInteger) { + return (BigInteger) value; + } + if (value instanceof Long) { + return BigInteger.valueOf((Long) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to BigInteger for field " + fieldName); + } + + public static boolean isBigIntTypeCompatible(final Object value) { + return value == null && (value instanceof BigInteger || value instanceof Long); + } + + public static Boolean toBoolean(final Object value, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Boolean) { + return (Boolean) value; + } + if (value instanceof String) { + final String string = (String) value; + if (string.equalsIgnoreCase("true")) { + return Boolean.TRUE; + } else if (string.equalsIgnoreCase("false")) { + return Boolean.FALSE; + } + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Boolean for field " + fieldName); + } + + public static boolean isBooleanTypeCompatible(final Object value) { + if (value == null) { + return false; + } + if (value instanceof Boolean) { + return true; + } + if (value instanceof String) { + final String string = (String) value; + return string.equalsIgnoreCase("true") || string.equalsIgnoreCase("false"); + } + return false; + } + + public static Double toDouble(final Object value, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + + if (value instanceof String) { + return Double.parseDouble((String) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Double for field " + fieldName); + } + + public static boolean isDoubleTypeCompatible(final Object value) { + return isNumberTypeCompatible(value, s -> Double.parseDouble(s)); + } + + private static boolean isNumberTypeCompatible(final Object value, final Consumer<String> stringValueVerifier) { + if (value == null) { + return false; + } + + if (value instanceof Number) { + return true; + } + + if (value instanceof String) { + try { + stringValueVerifier.accept((String) value); + return true; + } catch (final NumberFormatException nfe) { + return false; + } + } + + return false; + } + + public static Float toFloat(final Object value, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).floatValue(); + } + + if (value instanceof String) { + return Float.parseFloat((String) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Float for field " + fieldName); + } + + public static boolean isFloatTypeCompatible(final Object value) { + return isNumberTypeCompatible(value, s -> Float.parseFloat(s)); + } + + public static Long toLong(final Object value, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + return Long.parseLong((String) value); + } + + if (value instanceof java.util.Date) { + return ((java.util.Date) value).getTime(); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Long for field " + fieldName); + } + + public static boolean isLongTypeCompatible(final Object value) { + if (value == null) { + return false; + } + + if (value instanceof Number) { + return true; + } + + if (value instanceof java.util.Date) { + return true; + } + + if (value instanceof String) { + try { + Long.parseLong((String) value); + return true; + } catch (final NumberFormatException nfe) { + return false; + } + } + + return false; + } + + + public static Integer toInteger(final Object value, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).intValue(); + } + + if (value instanceof String) { + return Integer.parseInt((String) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Integer for field " + fieldName); + } + + public static boolean isIntegerTypeCompatible(final Object value) { + return isNumberTypeCompatible(value, s -> Integer.parseInt(s)); + } + + + public static Short toShort(final Object value, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).shortValue(); + } + + if (value instanceof String) { + return Short.parseShort((String) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Short for field " + fieldName); + } + + public static boolean isShortTypeCompatible(final Object value) { + return isNumberTypeCompatible(value, s -> Short.parseShort(s)); + } + + public static Byte toByte(final Object value, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).byteValue(); + } + + if (value instanceof String) { + return Byte.parseByte((String) value); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Byte for field " + fieldName); + } + + public static boolean isByteTypeCompatible(final Object value) { + return isNumberTypeCompatible(value, s -> Byte.parseByte(s)); + } + + + public static Character toCharacter(final Object value, final String fieldName) { + if (value == null) { + return null; + } + + if (value instanceof Character) { + return ((Character) value); + } + + if (value instanceof CharSequence) { + final CharSequence charSeq = (CharSequence) value; + if (charSeq.length() == 0) { + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + + " to Character because it has a length of 0 for field " + fieldName); + } + + return charSeq.charAt(0); + } + + throw new IllegalTypeConversionException("Cannot convert value [" + value + "] of type " + value.getClass() + " to Character for field " + fieldName); + } + + public static boolean isCharacterTypeCompatible(final Object value) { + return value != null && (value instanceof Character || (value instanceof CharSequence && ((CharSequence) value).length() > 0)); + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java new file mode 100644 index 0000000..38b5d20 --- /dev/null +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/IllegalTypeConversionException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record.util; + +public class IllegalTypeConversionException extends RuntimeException { + + public IllegalTypeConversionException(final String message) { + super(message); + } + + public IllegalTypeConversionException(final String message, final Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java new file mode 100644 index 0000000..5a61275 --- /dev/null +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.junit.Assert; +import org.junit.Test; + +public class TestSimpleRecordSchema { + + @Test + public void testPreventsTwoFieldsWithSameAlias() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar"))); + fields.add(new RecordField("goodbye", RecordFieldType.STRING.getDataType(), null, set("baz", "bar"))); + + try { + new SimpleRecordSchema(fields); + Assert.fail("Was able to create two fields with same alias"); + } catch (final IllegalArgumentException expected) { + } + } + + @Test + public void testPreventsTwoFieldsWithSameName() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar"))); + fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType())); + + try { + new SimpleRecordSchema(fields); + Assert.fail("Was able to create two fields with same name"); + } catch (final IllegalArgumentException expected) { + } + } + + @Test + public void testPreventsTwoFieldsWithConflictingNamesAliases() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("foo", "bar"))); + fields.add(new RecordField("bar", RecordFieldType.STRING.getDataType())); + + try { + new SimpleRecordSchema(fields); + Assert.fail("Was able to create two fields with conflicting names/aliases"); + } catch (final IllegalArgumentException expected) { + } + } + + private Set<String> set(final String... values) { + final Set<String> set = new HashSet<>(); + for (final String value : values) { + set.add(value); + } + return set; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java new file mode 100644 index 0000000..82e20a6 --- /dev/null +++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/record/TestMapRecord.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization.record; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.junit.Assert; +import org.junit.Test; + +public class TestMapRecord { + + @Test + public void testDefaultValue() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello")); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map<String, Object> values = new HashMap<>(); + final Record record = new MapRecord(schema, values); + + assertNull(record.getValue("noDefault")); + assertEquals("hello", record.getValue("defaultOfHello")); + } + + @Test + public void testDefaultValueInGivenField() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("noDefault", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("defaultOfHello", RecordFieldType.STRING.getDataType(), "hello")); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map<String, Object> values = new HashMap<>(); + final Record record = new MapRecord(schema, values); + + assertNull(record.getValue("noDefault")); + assertEquals("hello", record.getValue("defaultOfHello")); + + final RecordField newField = new RecordField("noDefault", RecordFieldType.STRING.getDataType(), "new"); + assertEquals("new", record.getValue(newField)); + } + + @Test + public void testIllegalDefaultValue() { + new RecordField("hello", RecordFieldType.STRING.getDataType(), 84); + new RecordField("hello", RecordFieldType.STRING.getDataType(), (Object) null); + new RecordField("hello", RecordFieldType.INT.getDataType(), 84); + new RecordField("hello", RecordFieldType.INT.getDataType(), (Object) null); + + try { + new RecordField("hello", RecordFieldType.INT.getDataType(), "foo"); + Assert.fail("Was able to set a default value of \"foo\" for INT type"); + } catch (final IllegalArgumentException expected) { + // expected + } + } + + private Set<String> set(final String... values) { + final Set<String> set = new HashSet<>(); + for (final String value : values) { + set.add(value); + } + return set; + } + + @Test + public void testAliasOneValue() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map<String, Object> values = new HashMap<>(); + values.put("bar", 1); + + final Record record = new MapRecord(schema, values); + assertEquals(1, record.getValue("foo")); + assertEquals(1, record.getValue("bar")); + assertEquals(1, record.getValue("baz")); + } + + @Test + public void testAliasConflictingValues() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map<String, Object> values = new HashMap<>(); + values.put("bar", 1); + values.put("foo", null); + + final Record record = new MapRecord(schema, values); + assertEquals(1, record.getValue("foo")); + assertEquals(1, record.getValue("bar")); + assertEquals(1, record.getValue("baz")); + } + + @Test + public void testAliasConflictingAliasValues() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map<String, Object> values = new HashMap<>(); + values.put("baz", 1); + values.put("bar", 33); + + final Record record = new MapRecord(schema, values); + assertEquals(33, record.getValue("foo")); + assertEquals(33, record.getValue("bar")); + assertEquals(33, record.getValue("baz")); + } + + @Test + public void testAliasInGivenField() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), null, set("bar", "baz"))); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map<String, Object> values = new HashMap<>(); + values.put("bar", 33); + + final Record record = new MapRecord(schema, values); + assertEquals(33, record.getValue("foo")); + assertEquals(33, record.getValue("bar")); + assertEquals(33, record.getValue("baz")); + + final RecordField noAlias = new RecordField("hello", RecordFieldType.STRING.getDataType()); + assertNull(record.getValue(noAlias)); + + final RecordField withAlias = new RecordField("hello", RecordFieldType.STRING.getDataType(), null, set("baz")); + assertEquals(33, record.getValue(withAlias)); + assertEquals("33", record.getAsString(withAlias, withAlias.getDataType().getFormat())); + } + + + @Test + public void testDefaultValueWithAliasValue() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz"))); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map<String, Object> values = new HashMap<>(); + values.put("baz", 1); + values.put("bar", 33); + + final Record record = new MapRecord(schema, values); + assertEquals(33, record.getValue("foo")); + assertEquals(33, record.getValue("bar")); + assertEquals(33, record.getValue("baz")); + } + + @Test + public void testDefaultValueWithAliasesDefined() { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("foo", RecordFieldType.STRING.getDataType(), "hello", set("bar", "baz"))); + + final RecordSchema schema = new SimpleRecordSchema(fields); + final Map<String, Object> values = new HashMap<>(); + final Record record = new MapRecord(schema, values); + assertEquals("hello", record.getValue("foo")); + assertEquals("hello", record.getValue("bar")); + assertEquals("hello", record.getValue("baz")); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/pom.xml b/nifi-commons/pom.xml index d0eb6b6..17a0fc2 100644 --- a/nifi-commons/pom.xml +++ b/nifi-commons/pom.xml @@ -33,11 +33,10 @@ <module>nifi-socket-utils</module> <module>nifi-utils</module> <module>nifi-web-utils</module> - <module>nifi-processor-utilities</module> <module>nifi-write-ahead-log</module> <module>nifi-site-to-site-client</module> <module>nifi-hl7-query-language</module> - <module>nifi-hadoop-utils</module> <module>nifi-schema-utils</module> + <module>nifi-record</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml new file mode 100644 index 0000000..e376ba9 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/pom.xml @@ -0,0 +1,61 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-extension-utils</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-hadoop-utils</artifactId> + <version>1.2.0-SNAPSHOT</version> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <!-- Other modules using nifi-hadoop-utils are expected to have the below dependencies available, typically through a NAR dependency --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/krb5.conf</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java new file mode 100644 index 0000000..af10079 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosProperties.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +/** + * All processors and controller services that need properties for Kerberos + * Principal and Keytab should obtain them through this class by calling: + * + * KerberosProperties props = + * KerberosProperties.create(NiFiProperties.getInstance()) + * + * The properties can be accessed from the resulting KerberosProperties + * instance. + */ +public class KerberosProperties { + + private final File kerberosConfigFile; + private final Validator kerberosConfigValidator; + private final PropertyDescriptor kerberosPrincipal; + private final PropertyDescriptor kerberosKeytab; + + /** + * Instantiate a KerberosProperties object but keep in mind it is + * effectively a singleton because the krb5.conf file needs to be set as a + * system property which this constructor will take care of. + * + * @param kerberosConfigFile file of krb5.conf + */ + public KerberosProperties(final File kerberosConfigFile) { + this.kerberosConfigFile = kerberosConfigFile; + + this.kerberosConfigValidator = new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + // Check that the Kerberos configuration is set + if (kerberosConfigFile == null) { + return new ValidationResult.Builder() + .subject(subject).input(input).valid(false) + .explanation("you are missing the nifi.kerberos.krb5.file property which " + + "must be set in order to use Kerberos") + .build(); + } + + // Check that the Kerberos configuration is readable + if (!kerberosConfigFile.canRead()) { + return new ValidationResult.Builder().subject(subject).input(input).valid(false) + .explanation(String.format("unable to read Kerberos config [%s], please make sure the path is valid " + + "and nifi has adequate permissions", kerberosConfigFile.getAbsoluteFile())) + .build(); + } + + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + }; + + this.kerberosPrincipal = new PropertyDescriptor.Builder() + .name("Kerberos Principal") + .required(false) + .description("Kerberos principal to authenticate as. Requires nifi.kerberos.krb5.file to be set in your nifi.properties") + .addValidator(kerberosConfigValidator) + .build(); + + this.kerberosKeytab = new PropertyDescriptor.Builder() + .name("Kerberos Keytab").required(false) + .description("Kerberos keytab associated with the principal. Requires nifi.kerberos.krb5.file to be set in your nifi.properties") + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .addValidator(kerberosConfigValidator) + .build(); + } + + public File getKerberosConfigFile() { + return kerberosConfigFile; + } + + public Validator getKerberosConfigValidator() { + return kerberosConfigValidator; + } + + public PropertyDescriptor getKerberosPrincipal() { + return kerberosPrincipal; + } + + public PropertyDescriptor getKerberosKeytab() { + return kerberosKeytab; + } + + public static List<ValidationResult> validatePrincipalAndKeytab(final String subject, final Configuration config, final String principal, final String keytab, final ComponentLog logger) { + final List<ValidationResult> results = new ArrayList<>(); + + // if security is enabled then the keytab and principal are required + final boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled(config); + + final boolean blankPrincipal = (principal == null || principal.isEmpty()); + if (isSecurityEnabled && blankPrincipal) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(subject) + .explanation("Kerberos Principal must be provided when using a secure configuration") + .build()); + } + + final boolean blankKeytab = (keytab == null || keytab.isEmpty()); + if (isSecurityEnabled && blankKeytab) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(subject) + .explanation("Kerberos Keytab must be provided when using a secure configuration") + .build()); + } + + if (!isSecurityEnabled && (!blankPrincipal || !blankKeytab)) { + logger.warn("Configuration does not have security enabled, Keytab and Principal will be ignored"); + } + + return results; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java new file mode 100644 index 0000000..bf922fe --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/KerberosTicketRenewer.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.logging.ComponentLog; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; + +/** + * Periodically attempts to renew the Kerberos user's ticket for the given UGI. + * + * This class will attempt to call ugi.checkTGTAndReloginFromKeytab() which + * will re-login the user if the ticket expired or is close to expiry. Between + * relogin attempts this thread will sleep for the provided amount of time. + * + */ +public class KerberosTicketRenewer implements Runnable { + + private final UserGroupInformation ugi; + private final long renewalPeriod; + private final ComponentLog logger; + + private volatile boolean stopped = false; + + /** + * @param ugi + * the user to renew the ticket for + * @param renewalPeriod + * the amount of time in milliseconds to wait between renewal attempts + * @param logger + * the logger from the component that started the renewer + */ + public KerberosTicketRenewer(final UserGroupInformation ugi, final long renewalPeriod, final ComponentLog logger) { + this.ugi = ugi; + this.renewalPeriod = renewalPeriod; + this.logger = logger; + } + + @Override + public void run() { + stopped = false; + while (!stopped) { + try { + logger.debug("Invoking renewal attempt for Kerberos ticket"); + // While we run this "frequently", the Hadoop implementation will only perform the login at 80% of ticket lifetime. + ugi.doAs((PrivilegedExceptionAction<Void>) () -> { + ugi.checkTGTAndReloginFromKeytab(); + return null; + }); + } catch (IOException e) { + logger.error("Failed to renew Kerberos ticket", e); + } catch (InterruptedException e) { + logger.error("Interrupted while attempting to renew Kerberos ticket", e); + Thread.currentThread().interrupt(); + return; + } + + logger.debug("current UGI {}", new Object[]{ugi}); + + // Wait for a bit before checking again. + try { + Thread.sleep(renewalPeriod); + } catch (InterruptedException e) { + logger.error("Renewal thread interrupted", e); + Thread.currentThread().interrupt(); + return; + } + } + } + + public void stop() { + stopped = true; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java new file mode 100644 index 0000000..fcb9032 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/hadoop/SecurityUtil.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.hadoop; + +import org.apache.commons.lang3.Validate; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.logging.ComponentLog; + +import java.io.IOException; + +/** + * Provides synchronized access to UserGroupInformation to avoid multiple processors/services from + * interfering with each other. + */ +public class SecurityUtil { + public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication"; + public static final String KERBEROS = "kerberos"; + + /** + * Initializes UserGroupInformation with the given Configuration and performs the login for the given principal + * and keytab. All logins should happen through this class to ensure other threads are not concurrently modifying + * UserGroupInformation. + * + * @param config the configuration instance + * @param principal the principal to authenticate as + * @param keyTab the keytab to authenticate with + * + * @return the UGI for the given principal + * + * @throws IOException if login failed + */ + public static synchronized UserGroupInformation loginKerberos(final Configuration config, final String principal, final String keyTab) + throws IOException { + Validate.notNull(config); + Validate.notNull(principal); + Validate.notNull(keyTab); + + UserGroupInformation.setConfiguration(config); + return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal.trim(), keyTab.trim()); + } + + /** + * Initializes UserGroupInformation with the given Configuration and returns UserGroupInformation.getLoginUser(). + * All logins should happen through this class to ensure other threads are not concurrently modifying + * UserGroupInformation. + * + * @param config the configuration instance + * + * @return the UGI for the given principal + * + * @throws IOException if login failed + */ + public static synchronized UserGroupInformation loginSimple(final Configuration config) throws IOException { + Validate.notNull(config); + UserGroupInformation.setConfiguration(config); + return UserGroupInformation.getLoginUser(); + } + + /** + * Initializes UserGroupInformation with the given Configuration and returns UserGroupInformation.isSecurityEnabled(). + * + * All checks for isSecurityEnabled() should happen through this method. + * + * @param config the given configuration + * + * @return true if kerberos is enabled on the given configuration, false otherwise + * + */ + public static boolean isSecurityEnabled(final Configuration config) { + Validate.notNull(config); + return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION)); + } + + /** + * Start a thread that periodically attempts to renew the current Kerberos user's ticket. + * + * Callers of this method should store the reference to the KerberosTicketRenewer and call stop() to stop the thread. + * + * @param id + * The unique identifier to use for the thread, can be the class name that started the thread + * (i.e. PutHDFS, etc) + * @param ugi + * The current Kerberos user. + * @param renewalPeriod + * The amount of time between attempting renewals. + * @param logger + * The logger to use with in the renewer + * + * @return the KerberosTicketRenewer Runnable + */ + public static KerberosTicketRenewer startTicketRenewalThread(final String id, final UserGroupInformation ugi, final long renewalPeriod, final ComponentLog logger) { + final KerberosTicketRenewer renewer = new KerberosTicketRenewer(ugi, renewalPeriod, logger); + + final Thread t = new Thread(renewer); + t.setName("Kerberos Ticket Renewal [" + id + "]"); + t.start(); + + return renewer; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java new file mode 100644 index 0000000..0a8ed12 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractHadoopProcessor.java @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.hadoop.KerberosProperties; +import org.apache.nifi.hadoop.SecurityUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import javax.net.SocketFactory; +import java.io.File; +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * This is a base class that is helpful when building processors interacting with HDFS. + */ +@RequiresInstanceClassLoading(cloneAncestorResources = true) +public abstract class AbstractHadoopProcessor extends AbstractProcessor { + + // properties + public static final PropertyDescriptor HADOOP_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder() + .name("Hadoop Configuration Resources") + .description("A file or comma separated list of files which contains the Hadoop file system configuration. Without this, Hadoop " + + "will search the classpath for a 'core-site.xml' and 'hdfs-site.xml' file or will revert to a default configuration.") + .required(false) + .addValidator(HadoopValidators.ONE_OR_MORE_FILE_EXISTS_VALIDATOR) + .build(); + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name("Directory") + .description("The HDFS directory from which files should be read") + .required(true) + .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() + .name("Compression codec") + .required(true) + .allowableValues(CompressionType.values()) + .defaultValue(CompressionType.NONE.toString()) + .build(); + + public static final PropertyDescriptor KERBEROS_RELOGIN_PERIOD = new PropertyDescriptor.Builder() + .name("Kerberos Relogin Period").required(false) + .description("Period of time which should pass before attempting a kerberos relogin") + .defaultValue("4 hours") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor ADDITIONAL_CLASSPATH_RESOURCES = new PropertyDescriptor.Builder() + .name("Additional Classpath Resources") + .description("A comma-separated list of paths to files and/or directories that will be added to the classpath. When specifying a " + + "directory, all files with in the directory will be added to the classpath, but further sub-directories will not be included.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamicallyModifiesClasspath(true) + .build(); + + public static final String ABSOLUTE_HDFS_PATH_ATTRIBUTE = "absolute.hdfs.path"; + + private static final Object RESOURCES_LOCK = new Object(); + + private long kerberosReloginThreshold; + private long lastKerberosReloginTime; + protected KerberosProperties kerberosProperties; + protected List<PropertyDescriptor> properties; + private volatile File kerberosConfigFile = null; + + // variables shared by all threads of this processor + // Hadoop Configuration, Filesystem, and UserGroupInformation (optional) + private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>(); + + // Holder of cached Configuration information so validation does not reload the same config over and over + private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>(); + + @Override + protected void init(ProcessorInitializationContext context) { + hdfsResources.set(new HdfsResources(null, null, null)); + + kerberosConfigFile = context.getKerberosConfigurationFile(); + kerberosProperties = getKerberosProperties(kerberosConfigFile); + + List<PropertyDescriptor> props = new ArrayList<>(); + props.add(HADOOP_CONFIGURATION_RESOURCES); + props.add(kerberosProperties.getKerberosPrincipal()); + props.add(kerberosProperties.getKerberosKeytab()); + props.add(KERBEROS_RELOGIN_PERIOD); + props.add(ADDITIONAL_CLASSPATH_RESOURCES); + properties = Collections.unmodifiableList(props); + } + + protected KerberosProperties getKerberosProperties(File kerberosConfigFile) { + return new KerberosProperties(kerberosConfigFile); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected Collection<ValidationResult> customValidate(ValidationContext validationContext) { + final String configResources = validationContext.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); + final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + + final List<ValidationResult> results = new ArrayList<>(); + + if (!StringUtils.isBlank(configResources)) { + try { + ValidationResources resources = validationResourceHolder.get(); + + // if no resources in the holder, or if the holder has different resources loaded, + // then load the Configuration and set the new resources in the holder + if (resources == null || !configResources.equals(resources.getConfigResources())) { + getLogger().debug("Reloading validation resources"); + final Configuration config = new ExtendedConfiguration(getLogger()); + config.setClassLoader(Thread.currentThread().getContextClassLoader()); + resources = new ValidationResources(configResources, getConfigurationFromResources(config, configResources)); + validationResourceHolder.set(resources); + } + + final Configuration conf = resources.getConfiguration(); + results.addAll(KerberosProperties.validatePrincipalAndKeytab( + this.getClass().getSimpleName(), conf, principal, keytab, getLogger())); + + } catch (IOException e) { + results.add(new ValidationResult.Builder() + .valid(false) + .subject(this.getClass().getSimpleName()) + .explanation("Could not load Hadoop Configuration resources") + .build()); + } + } + + return results; + } + + /* + * If your subclass also has an @OnScheduled annotated method and you need hdfsResources in that method, then be sure to call super.abstractOnScheduled(context) + */ + @OnScheduled + public final void abstractOnScheduled(ProcessContext context) throws IOException { + try { + // This value will be null when called from ListHDFS, because it overrides all of the default + // properties this processor sets. TODO: re-work ListHDFS to utilize Kerberos + if (context.getProperty(KERBEROS_RELOGIN_PERIOD).getValue() != null) { + kerberosReloginThreshold = context.getProperty(KERBEROS_RELOGIN_PERIOD).asTimePeriod(TimeUnit.SECONDS); + } + HdfsResources resources = hdfsResources.get(); + if (resources.getConfiguration() == null) { + final String configResources = context.getProperty(HADOOP_CONFIGURATION_RESOURCES).getValue(); + resources = resetHDFSResources(configResources, context); + hdfsResources.set(resources); + } + } catch (IOException ex) { + getLogger().error("HDFS Configuration error - {}", new Object[] { ex }); + hdfsResources.set(new HdfsResources(null, null, null)); + throw ex; + } + } + + @OnStopped + public final void abstractOnStopped() { + hdfsResources.set(new HdfsResources(null, null, null)); + } + + private static Configuration getConfigurationFromResources(final Configuration config, String configResources) throws IOException { + boolean foundResources = false; + if (null != configResources) { + String[] resources = configResources.split(","); + for (String resource : resources) { + config.addResource(new Path(resource.trim())); + foundResources = true; + } + } + + if (!foundResources) { + // check that at least 1 non-default resource is available on the classpath + String configStr = config.toString(); + for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) { + if (!resource.contains("default") && config.getResource(resource.trim()) != null) { + foundResources = true; + break; + } + } + } + + if (!foundResources) { + throw new IOException("Could not find any of the " + HADOOP_CONFIGURATION_RESOURCES.getName() + " on the classpath"); + } + return config; + } + + /* + * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources. + */ + HdfsResources resetHDFSResources(String configResources, ProcessContext context) throws IOException { + Configuration config = new ExtendedConfiguration(getLogger()); + config.setClassLoader(Thread.currentThread().getContextClassLoader()); + + getConfigurationFromResources(config, configResources); + + // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout + checkHdfsUriForTimeout(config); + + // disable caching of Configuration and FileSystem objects, else we cannot reconfigure the processor without a complete + // restart + String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri(config).getScheme()); + config.set(disableCacheName, "true"); + + // If kerberos is enabled, create the file system as the kerberos principal + // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time + FileSystem fs; + UserGroupInformation ugi; + synchronized (RESOURCES_LOCK) { + if (SecurityUtil.isSecurityEnabled(config)) { + String principal = context.getProperty(kerberosProperties.getKerberosPrincipal()).getValue(); + String keyTab = context.getProperty(kerberosProperties.getKerberosKeytab()).getValue(); + ugi = SecurityUtil.loginKerberos(config, principal, keyTab); + fs = getFileSystemAsUser(config, ugi); + lastKerberosReloginTime = System.currentTimeMillis() / 1000; + } else { + config.set("ipc.client.fallback-to-simple-auth-allowed", "true"); + config.set("hadoop.security.authentication", "simple"); + ugi = SecurityUtil.loginSimple(config); + fs = getFileSystemAsUser(config, ugi); + } + } + getLogger().debug("resetHDFSResources UGI {}", new Object[]{ugi}); + + final Path workingDir = fs.getWorkingDirectory(); + getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", + new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()}); + + return new HdfsResources(config, fs, ugi); + } + + /** + * This exists in order to allow unit tests to override it so that they don't take several minutes waiting for UDP packets to be received + * + * @param config + * the configuration to use + * @return the FileSystem that is created for the given Configuration + * @throws IOException + * if unable to create the FileSystem + */ + protected FileSystem getFileSystem(final Configuration config) throws IOException { + return FileSystem.get(config); + } + + protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException { + try { + return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { + @Override + public FileSystem run() throws Exception { + return FileSystem.get(config); + } + }); + } catch (InterruptedException e) { + throw new IOException("Unable to create file system: " + e.getMessage()); + } + } + + /* + * Drastically reduce the timeout of a socket connection from the default in FileSystem.get() + */ + protected void checkHdfsUriForTimeout(Configuration config) throws IOException { + URI hdfsUri = FileSystem.getDefaultUri(config); + String address = hdfsUri.getAuthority(); + int port = hdfsUri.getPort(); + if (address == null || address.isEmpty() || port < 0) { + return; + } + InetSocketAddress namenode = NetUtils.createSocketAddr(address, port); + SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config); + Socket socket = null; + try { + socket = socketFactory.createSocket(); + NetUtils.connect(socket, namenode, 1000); // 1 second timeout + } finally { + IOUtils.closeQuietly(socket); + } + } + + /** + * Returns the configured CompressionCodec, or null if none is configured. + * + * @param context + * the ProcessContext + * @param configuration + * the Hadoop Configuration + * @return CompressionCodec or null + */ + protected org.apache.hadoop.io.compress.CompressionCodec getCompressionCodec(ProcessContext context, Configuration configuration) { + org.apache.hadoop.io.compress.CompressionCodec codec = null; + if (context.getProperty(COMPRESSION_CODEC).isSet()) { + String compressionClassname = CompressionType.valueOf(context.getProperty(COMPRESSION_CODEC).getValue()).toString(); + CompressionCodecFactory ccf = new CompressionCodecFactory(configuration); + codec = ccf.getCodecByClassName(compressionClassname); + } + + return codec; + } + + /** + * Returns the relative path of the child that does not include the filename or the root path. + * + * @param root + * the path to relativize from + * @param child + * the path to relativize + * @return the relative path + */ + public static String getPathDifference(final Path root, final Path child) { + final int depthDiff = child.depth() - root.depth(); + if (depthDiff <= 1) { + return "".intern(); + } + String lastRoot = root.getName(); + Path childsParent = child.getParent(); + final StringBuilder builder = new StringBuilder(); + builder.append(childsParent.getName()); + for (int i = (depthDiff - 3); i >= 0; i--) { + childsParent = childsParent.getParent(); + String name = childsParent.getName(); + if (name.equals(lastRoot) && childsParent.toString().endsWith(root.toString())) { + break; + } + builder.insert(0, Path.SEPARATOR).insert(0, name); + } + return builder.toString(); + } + + protected Configuration getConfiguration() { + return hdfsResources.get().getConfiguration(); + } + + protected FileSystem getFileSystem() { + // trigger Relogin if necessary + getUserGroupInformation(); + return hdfsResources.get().getFileSystem(); + } + + protected UserGroupInformation getUserGroupInformation() { + // if kerberos is enabled, check if the ticket should be renewed before returning + UserGroupInformation userGroupInformation = hdfsResources.get().getUserGroupInformation(); + if (userGroupInformation != null && isTicketOld()) { + tryKerberosRelogin(userGroupInformation); + } + return userGroupInformation; + } + + protected void tryKerberosRelogin(UserGroupInformation ugi) { + try { + getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " + + "attempting to renew ticket for user {}", new Object[]{ + kerberosReloginThreshold, ugi.getUserName()}); + ugi.doAs((PrivilegedExceptionAction<Void>) () -> { + ugi.checkTGTAndReloginFromKeytab(); + return null; + }); + lastKerberosReloginTime = System.currentTimeMillis() / 1000; + getLogger().info("Kerberos relogin successful or ticket still valid"); + } catch (IOException e) { + // Most likely case of this happening is ticket is expired and error getting a new one, + // meaning dfs operations would fail + getLogger().error("Kerberos relogin failed", e); + throw new ProcessException("Unable to renew kerberos ticket", e); + } catch (InterruptedException e) { + getLogger().error("Interrupted while attempting Kerberos relogin", e); + throw new ProcessException("Unable to renew kerberos ticket", e); + } + } + + protected boolean isTicketOld() { + return (System.currentTimeMillis() / 1000 - lastKerberosReloginTime) > kerberosReloginThreshold; + } + + static protected class HdfsResources { + private final Configuration configuration; + private final FileSystem fileSystem; + private final UserGroupInformation userGroupInformation; + + public HdfsResources(Configuration configuration, FileSystem fileSystem, UserGroupInformation userGroupInformation) { + this.configuration = configuration; + this.fileSystem = fileSystem; + this.userGroupInformation = userGroupInformation; + } + + public Configuration getConfiguration() { + return configuration; + } + + public FileSystem getFileSystem() { + return fileSystem; + } + + public UserGroupInformation getUserGroupInformation() { + return userGroupInformation; + } + } + + static protected class ValidationResources { + private final String configResources; + private final Configuration configuration; + + public ValidationResources(String configResources, Configuration configuration) { + this.configResources = configResources; + this.configuration = configuration; + } + + public String getConfigResources() { + return configResources; + } + + public Configuration getConfiguration() { + return configuration; + } + } + + /** + * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be + * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load + * something that was previously not found, but might now be available. + * + * Reference the original getClassByNameOrNull from Configuration. + */ + static class ExtendedConfiguration extends Configuration { + + private final ComponentLog logger; + private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> CACHE_CLASSES = new WeakHashMap<>(); + + public ExtendedConfiguration(final ComponentLog logger) { + this.logger = logger; + } + + public Class<?> getClassByNameOrNull(String name) { + final ClassLoader classLoader = getClassLoader(); + + Map<String, WeakReference<Class<?>>> map; + synchronized (CACHE_CLASSES) { + map = CACHE_CLASSES.get(classLoader); + if (map == null) { + map = Collections.synchronizedMap(new WeakHashMap<>()); + CACHE_CLASSES.put(classLoader, map); + } + } + + Class<?> clazz = null; + WeakReference<Class<?>> ref = map.get(name); + if (ref != null) { + clazz = ref.get(); + } + + if (clazz == null) { + try { + clazz = Class.forName(name, true, classLoader); + } catch (ClassNotFoundException e) { + logger.error(e.getMessage(), e); + return null; + } + // two putters can race here, but they'll put the same class + map.put(name, new WeakReference<>(clazz)); + return clazz; + } else { + // cache hit + return clazz; + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/CompressionType.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/CompressionType.java b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/CompressionType.java new file mode 100644 index 0000000..43e6e80 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-hadoop-utils/src/main/java/org/apache/nifi/processors/hadoop/CompressionType.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.io.compress.Lz4Codec; +import org.apache.hadoop.io.compress.SnappyCodec; + +/** + * Compression Type Enum for Hadoop related processors. + */ +public enum CompressionType { + NONE, + DEFAULT, + BZIP, + GZIP, + LZ4, + SNAPPY, + AUTOMATIC; + + @Override + public String toString() { + switch (this) { + case NONE: return "NONE"; + case DEFAULT: return DefaultCodec.class.getName(); + case BZIP: return BZip2Codec.class.getName(); + case GZIP: return GzipCodec.class.getName(); + case LZ4: return Lz4Codec.class.getName(); + case SNAPPY: return SnappyCodec.class.getName(); + case AUTOMATIC: return "Automatically Detected"; + } + return null; + } + +}
