This is an automated email from the ASF dual-hosted git repository.

edbe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e52ae9  NIFI-5909 added optional settings for date, time, and 
timestamp formats used to write Records to Elasticsearch
3e52ae9 is described below

commit 3e52ae952d05867f3366dc4f796ae193a84b4b2c
Author: Alex Savitsky <[email protected]>
AuthorDate: Wed Dec 19 11:08:32 2018 -0500

    NIFI-5909 added optional settings for date, time, and timestamp formats 
used to write Records to Elasticsearch
    
    NIFI-5909 added content checks to the unit tests
    
    NIFI-5937 use explicit long value for test dates/times (to not depend on 
the timezone of test executor)
    
    NIFI-5937 tabs to spaces
    
    Fixing checkstyle violations introduced by 
https://github.com/apache/nifi/pull/3249 PR)
    
    NIFI-5937 adjusted property descriptions for consistency; limited EL scope 
to variable registry; added an appropriate validator along with its Maven 
dependency; moved format initialization to @OnScheduled
    
    NIFI-5909 tabs to spaces
    
    Signed-off-by: Ed <[email protected]>
    
    This closes #3227
---
 .../nifi-elasticsearch-processors/pom.xml          |  5 ++
 .../elasticsearch/PutElasticsearchHttpRecord.java  | 57 ++++++++++++++++++++--
 .../TestPutElasticsearchHttpRecord.java            | 31 ++++++++++--
 .../nifi/processors/hive/PutHive3Streaming.java    |  2 -
 4 files changed, 85 insertions(+), 10 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
index d4536cd..4db59f6 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml
@@ -122,6 +122,11 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>jackson-databind</artifactId>
             <version>${jackson.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-record-utils</artifactId>
+            <version>1.9.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 52de424..d431960 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -55,6 +55,7 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SimpleDateFormatValidator;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -178,6 +179,38 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
             .required(true)
             .build();
 
+    static final PropertyDescriptor DATE_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Date Format")
+            .description("Specifies the format to use when reading/writing 
Date fields. "
+                    + "If not specified, the default format '" + 
RecordFieldType.DATE.getDefaultFormat() + "' is used. "
+                    + "If specified, the value must match the Java Simple Date 
Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+                    + "a two-digit day, followed by a four-digit year, all 
separated by '/' characters, as in 01/01/2017).")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new SimpleDateFormatValidator())
+            .required(false)
+            .build();
+    static final PropertyDescriptor TIME_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Time Format")
+            .description("Specifies the format to use when reading/writing 
Time fields. "
+                    + "If not specified, the default format '" + 
RecordFieldType.TIME.getDefaultFormat() + "' is used. "
+                    + "If specified, the value must match the Java Simple Date 
Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed 
by "
+                    + "a two-digit minute, followed by a two-digit second, all 
separated by ':' characters, as in 18:04:15).")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new SimpleDateFormatValidator())
+            .required(false)
+            .build();
+    static final PropertyDescriptor TIMESTAMP_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("Timestamp Format")
+            .description("Specifies the format to use when reading/writing 
Timestamp fields. "
+                    + "If not specified, the default format '" + 
RecordFieldType.TIMESTAMP.getDefaultFormat() + "' is used. "
+                    + "If specified, the value must match the Java Simple Date 
Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+                    + "a two-digit day, followed by a four-digit year, all 
separated by '/' characters; and then followed by a two-digit hour in 24-hour 
format, followed by "
+                    + "a two-digit minute, followed by a two-digit second, all 
separated by ':' characters, as in 01/01/2017 18:04:15).")
+            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new SimpleDateFormatValidator())
+            .required(false)
+            .build();
+
     private static final Set<Relationship> relationships;
     private static final List<PropertyDescriptor> propertyDescriptors;
 
@@ -186,6 +219,9 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
     private final JsonFactory factory = new JsonFactory();
 
     private volatile String nullSuppression;
+    private volatile String dateFormat;
+    private volatile String timeFormat;
+    private volatile String timestampFormat;
 
     static {
         final Set<Relationship> _rels = new HashSet<>();
@@ -202,6 +238,9 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
         descriptors.add(CHARSET);
         descriptors.add(INDEX_OP);
         descriptors.add(SUPPRESS_NULLS);
+        descriptors.add(DATE_FORMAT);
+        descriptors.add(TIME_FORMAT);
+        descriptors.add(TIMESTAMP_FORMAT);
 
         propertyDescriptors = Collections.unmodifiableList(descriptors);
     }
@@ -248,6 +287,18 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
     public void setup(ProcessContext context) {
         super.setup(context);
         recordPathCache = new RecordPathCache(10);
+        this.dateFormat = 
context.getProperty(DATE_FORMAT).evaluateAttributeExpressions().getValue();
+        if (this.dateFormat == null) {
+            this.dateFormat = RecordFieldType.DATE.getDefaultFormat();
+        }
+        this.timeFormat = 
context.getProperty(TIME_FORMAT).evaluateAttributeExpressions().getValue();
+        if (this.timeFormat == null) {
+            this.timeFormat = RecordFieldType.TIME.getDefaultFormat();
+        }
+        this.timestampFormat = 
context.getProperty(TIMESTAMP_FORMAT).evaluateAttributeExpressions().getValue();
+        if (this.timestampFormat == null) {
+            this.timestampFormat = 
RecordFieldType.TIMESTAMP.getDefaultFormat();
+        }
     }
 
     @Override
@@ -486,7 +537,7 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
 
         switch (chosenDataType.getFieldType()) {
             case DATE: {
-                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.DATE.getDefaultFormat()));
+                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(this.dateFormat));
                 if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
                     generator.writeNumber(DataTypeUtils.toLong(coercedValue, 
fieldName));
                 } else {
@@ -495,7 +546,7 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
                 break;
             }
             case TIME: {
-                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.TIME.getDefaultFormat()));
+                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(this.timeFormat));
                 if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
                     generator.writeNumber(DataTypeUtils.toLong(coercedValue, 
fieldName));
                 } else {
@@ -504,7 +555,7 @@ public class PutElasticsearchHttpRecord extends 
AbstractElasticsearchHttpProcess
                 break;
             }
             case TIMESTAMP: {
-                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(RecordFieldType.TIMESTAMP.getDefaultFormat()));
+                final String stringValue = 
DataTypeUtils.toString(coercedValue, () -> 
DataTypeUtils.getDateFormat(this.timestampFormat));
                 if (DataTypeUtils.isLongTypeCompatible(stringValue)) {
                     generator.writeNumber(DataTypeUtils.toLong(coercedValue, 
fieldName));
                 } else {
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index 2cc16c1..992e615 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -41,6 +41,9 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.net.ConnectException;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -68,18 +71,30 @@ public class TestPutElasticsearchHttpRecord {
             assertEquals(1, record.get("id"));
             assertEquals("reç1", record.get("name"));
             assertEquals(101, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals("6:55 PM", record.get("time"));
+            assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(2, record.get("id"));
             assertEquals("ræc2", record.get("name"));
             assertEquals(102, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals("6:55 PM", record.get("time"));
+            assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(3, record.get("id"));
             assertEquals("rèc3", record.get("name"));
             assertEquals(103, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals("6:55 PM", record.get("time"));
+            assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         }, record -> {
             assertEquals(4, record.get("id"));
             assertEquals("rëc4", record.get("name"));
             assertEquals(104, record.get("code"));
+            assertEquals("20/12/2018", record.get("date"));
+            assertEquals("6:55 PM", record.get("time"));
+            assertEquals("20/12/2018 6:55 PM", record.get("ts"));
         });
         runner = TestRunners.newTestRunner(processor); // no failures
         generateTestData();
@@ -88,6 +103,9 @@ public class TestPutElasticsearchHttpRecord {
         runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
         runner.setProperty(PutElasticsearchHttpRecord.TYPE, "status");
         runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
+        runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
+        runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
+        runner.setProperty(PutElasticsearchHttpRecord.TIMESTAMP_FORMAT, 
"d/M/yyyy h:m a");
 
         runner.enqueue(new byte[0], new HashMap<String, String>() {{
             put("doc_id", "28039652140");
@@ -564,10 +582,13 @@ public class TestPutElasticsearchHttpRecord {
         parser.addSchemaField("id", RecordFieldType.INT);
         parser.addSchemaField("name", RecordFieldType.STRING);
         parser.addSchemaField("code", RecordFieldType.INT);
-
-        parser.addRecord(1, "reç1", 101);
-        parser.addRecord(2, "ræc2", 102);
-        parser.addRecord(3, "rèc3", 103);
-        parser.addRecord(4, "rëc4", 104);
+        parser.addSchemaField("date", RecordFieldType.DATE);
+        parser.addSchemaField("time", RecordFieldType.TIME);
+        parser.addSchemaField("ts", RecordFieldType.TIMESTAMP);
+
+        parser.addRecord(1, "reç1", 101, new Date(1545282000000L), new 
Time(68150000), new Timestamp(1545332150000L));
+        parser.addRecord(2, "ræc2", 102, new Date(1545282000000L), new 
Time(68150000), new Timestamp(1545332150000L));
+        parser.addRecord(3, "rèc3", 103, new Date(1545282000000L), new 
Time(68150000), new Timestamp(1545332150000L));
+        parser.addRecord(4, "rëc4", 104, new Date(1545282000000L), new 
Time(68150000), new Timestamp(1545332150000L));
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
index 81916c6..affbe11 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/main/java/org/apache/nifi/processors/hive/PutHive3Streaming.java
@@ -17,11 +17,9 @@
 package org.apache.nifi.processors.hive;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.hive.streaming.ConnectionError;
 import org.apache.hive.streaming.HiveStreamingConnection;
 import org.apache.hive.streaming.InvalidTable;

Reply via email to