exceptionfactory commented on a change in pull request #5476:
URL: https://github.com/apache/nifi/pull/5476#discussion_r744967480



##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/pom.xml
##########
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-geohash-bundle</artifactId>
+        <version>1.15.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-geohash-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>

Review comment:
       For clearer organization, it would be helpful to move all of the `test` 
scope dependencies after the standard compile dependencies.

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/test/java/org/apache/nifi/processors/geohash/GeohashRecordTest.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GeohashRecordTest {
+
+    private final TestRunner runner = 
TestRunners.newTestRunner(GeohashRecord.class);

Review comment:
       To avoid unexpected behavior, recommend removing the `final` keyword and 
moving the `newTestRunner()` assignment into the `setUp()` method.

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.WriteResult;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes 
Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+
+    public enum ProcessingMode {
+        ENCODE, DECODE
+    }
+
+    public enum GeohashFormat {
+        BASE_32, BINARY_STRING, LONG
+    }
+
+    public enum RoutingStrategy {
+        SKIP_UNENRICHED,
+        SPLIT,
+        REQUIRE_ALL_ENRICHED
+    }
+
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to 
geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ProcessingMode.values())
+            .defaultValue(ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor ROUTING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("routing-strategy")
+            .displayName("Routing Strategy")
+            .description("Specifies how to route records after encoding or 
decoding has been performed. "
+                    + "Skip_Unenriched will route a flowfile to success if any 
of its records is enriched, otherwise it will be sent to failure. "
+                    + "Split_Enriched_Unenriched will separate the records 
that have been enriched from those have not and send them to success, while 
unenriched records will be sent to failure; "
+                    + "and the original flowfile will be sent to the original 
relationship. "
+                    + "Require_All_Enriched will route a flowfile to success 
only if all of its records are enriched, otherwise it will be sent to failure")
+            .required(true)
+            .allowableValues(RoutingStrategy.values())
+            .defaultValue(RoutingStrategy.SKIP_UNENRICHED.name())
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for 
reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for 
writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the latitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the latitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the longitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the longitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the 
desired format for encoding geohash; "
+                    + "in the decode mode, this property specifies the format 
of geohash provided")
+            .required(true)
+            .allowableValues(GeohashFormat.values())
+            .defaultValue(GeohashFormat.BASE_32.name())

Review comment:
       Is there a reason why this property does not support expression 
language?  It looks like most of the other properties support FlowFile 
attributes with EL.

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.WriteResult;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes 
Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+
+    public enum ProcessingMode {
+        ENCODE, DECODE
+    }
+
+    public enum GeohashFormat {
+        BASE_32, BINARY_STRING, LONG
+    }
+
+    public enum RoutingStrategy {
+        SKIP_UNENRICHED,
+        SPLIT,
+        REQUIRE_ALL_ENRICHED
+    }
+
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to 
geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ProcessingMode.values())
+            .defaultValue(ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor ROUTING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("routing-strategy")
+            .displayName("Routing Strategy")
+            .description("Specifies how to route records after encoding or 
decoding has been performed. "
+                    + "Skip_Unenriched will route a flowfile to success if any 
of its records is enriched, otherwise it will be sent to failure. "
+                    + "Split_Enriched_Unenriched will separate the records 
that have been enriched from those have not and send them to success, while 
unenriched records will be sent to failure; "
+                    + "and the original flowfile will be sent to the original 
relationship. "
+                    + "Require_All_Enriched will route a flowfile to success 
only if all of its records are enriched, otherwise it will be sent to failure")
+            .required(true)
+            .allowableValues(RoutingStrategy.values())
+            .defaultValue(RoutingStrategy.SKIP_UNENRICHED.name())
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for 
reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for 
writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the latitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the latitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the longitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the longitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the 
desired format for encoding geohash; "
+                    + "in the decode mode, this property specifies the format 
of geohash provided")
+            .required(true)
+            .allowableValues(GeohashFormat.values())
+            .defaultValue(GeohashFormat.BASE_32.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new 
PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for 
encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1, 12, true))
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(MODE, ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the 
record path to put the geohash value; "
+                    + "in the decode mode, this property specifies the record 
path to retrieve the geohash value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded 
will be routed to success")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be 
routed to failure")
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("With the Split_Enriched_Unenriched strategy, the 
original input flowfile will be sent to this relationship regardless of whether 
it was enriched or not.")

Review comment:
       ```suggestion
               .description("With the SPLIT strategy, the original input 
flowfile will be sent to this relationship regardless of whether it was 
enriched or not.")
   ```

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.WriteResult;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes 
Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+
+    public enum ProcessingMode {
+        ENCODE, DECODE
+    }
+
+    public enum GeohashFormat {
+        BASE_32, BINARY_STRING, LONG
+    }
+
+    public enum RoutingStrategy {
+        SKIP_UNENRICHED,
+        SPLIT,
+        REQUIRE_ALL_ENRICHED
+    }
+
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to 
geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ProcessingMode.values())
+            .defaultValue(ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor ROUTING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("routing-strategy")
+            .displayName("Routing Strategy")
+            .description("Specifies how to route records after encoding or 
decoding has been performed. "
+                    + "Skip_Unenriched will route a flowfile to success if any 
of its records is enriched, otherwise it will be sent to failure. "
+                    + "Split_Enriched_Unenriched will separate the records 
that have been enriched from those have not and send them to success, while 
unenriched records will be sent to failure; "

Review comment:
       ```suggestion
                       + "SPLIT will separate the records that have been 
enriched from those have not and send them to success, while unenriched records 
will be sent to failure; "
   ```

##########
File path: nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/pom.xml
##########
@@ -0,0 +1,98 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
https://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-geohash-bundle</artifactId>
+        <version>1.15.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-geohash-processors</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.15.0-SNAPSHOT</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>ch.hsr</groupId>
+            <artifactId>geohash</artifactId>
+            <version>1.4.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>

Review comment:
       This dependency can use the `provided` scope since it is included in the 
parent `nifi-standard-services-api-nar`.

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.WriteResult;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes 
Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+
+    public enum ProcessingMode {
+        ENCODE, DECODE
+    }
+
+    public enum GeohashFormat {
+        BASE_32, BINARY_STRING, LONG
+    }
+
+    public enum RoutingStrategy {
+        SKIP_UNENRICHED,
+        SPLIT,
+        REQUIRE_ALL_ENRICHED
+    }
+
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to 
geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ProcessingMode.values())
+            .defaultValue(ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor ROUTING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("routing-strategy")
+            .displayName("Routing Strategy")
+            .description("Specifies how to route records after encoding or 
decoding has been performed. "
+                    + "Skip_Unenriched will route a flowfile to success if any 
of its records is enriched, otherwise it will be sent to failure. "
+                    + "Split_Enriched_Unenriched will separate the records 
that have been enriched from those have not and send them to success, while 
unenriched records will be sent to failure; "
+                    + "and the original flowfile will be sent to the original 
relationship. "
+                    + "Require_All_Enriched will route a flowfile to success 
only if all of its records are enriched, otherwise it will be sent to failure")

Review comment:
       ```suggestion
                       + "REQUIRE_ALL_ENRICHED will route a flowfile to success 
only if all of its records are enriched, otherwise it will be sent to failure")
   ```

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.WriteResult;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes 
Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+
+    public enum ProcessingMode {
+        ENCODE, DECODE
+    }
+
+    public enum GeohashFormat {
+        BASE_32, BINARY_STRING, LONG
+    }
+
+    public enum RoutingStrategy {
+        SKIP_UNENRICHED,
+        SPLIT,
+        REQUIRE_ALL_ENRICHED
+    }
+
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to 
geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ProcessingMode.values())
+            .defaultValue(ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor ROUTING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("routing-strategy")
+            .displayName("Routing Strategy")
+            .description("Specifies how to route records after encoding or 
decoding has been performed. "
+                    + "Skip_Unenriched will route a flowfile to success if any 
of its records is enriched, otherwise it will be sent to failure. "

Review comment:
       ```suggestion
                       + "SKIP_UNENRICHED will route a flowfile to success if 
any of its records is enriched, otherwise it will be sent to failure. "
   ```

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.WriteResult;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes 
Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+
+    public enum ProcessingMode {
+        ENCODE, DECODE
+    }
+
+    public enum GeohashFormat {
+        BASE_32, BINARY_STRING, LONG
+    }
+
+    public enum RoutingStrategy {
+        SKIP_UNENRICHED,
+        SPLIT,
+        REQUIRE_ALL_ENRICHED
+    }
+
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to 
geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ProcessingMode.values())
+            .defaultValue(ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor ROUTING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("routing-strategy")
+            .displayName("Routing Strategy")
+            .description("Specifies how to route records after encoding or 
decoding has been performed. "
+                    + "Skip_Unenriched will route a flowfile to success if any 
of its records is enriched, otherwise it will be sent to failure. "
+                    + "Split_Enriched_Unenriched will separate the records 
that have been enriched from those have not and send them to success, while 
unenriched records will be sent to failure; "
+                    + "and the original flowfile will be sent to the original 
relationship. "
+                    + "Require_All_Enriched will route a flowfile to success 
only if all of its records are enriched, otherwise it will be sent to failure")
+            .required(true)
+            .allowableValues(RoutingStrategy.values())
+            .defaultValue(RoutingStrategy.SKIP_UNENRICHED.name())
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for 
reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for 
writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the latitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the latitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the longitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the longitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the 
desired format for encoding geohash; "
+                    + "in the decode mode, this property specifies the format 
of geohash provided")
+            .required(true)
+            .allowableValues(GeohashFormat.values())
+            .defaultValue(GeohashFormat.BASE_32.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new 
PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for 
encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1, 12, true))
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(MODE, ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the 
record path to put the geohash value; "
+                    + "in the decode mode, this property specifies the record 
path to retrieve the geohash value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded 
will be routed to success")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be 
routed to failure")
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("With the Split_Enriched_Unenriched strategy, the 
original input flowfile will be sent to this relationship regardless of whether 
it was enriched or not.")
+            .build();
+
+    private static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH, LONGITUDE_RECORD_PATH, GEOHASH_RECORD_PATH
+    ));
+
+    private final RecordPathCache cache = new RecordPathCache(100);
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(ROUTING_STRATEGY);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = 
context.getProperty(MODE).getValue().equalsIgnoreCase(ProcessingMode.ENCODE.toString());
+        final RoutingStrategy routingStrategy = 
RoutingStrategy.valueOf(context.getProperty(ROUTING_STRATEGY).getValue());
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        FlowFile notFound = routingStrategy == RoutingStrategy.SPLIT ? 
session.create(input) : null;
+
+        try (InputStream is = session.read(input);
+             RecordReader reader = readerFactory.createRecordReader(input, is, 
getLogger());
+             OutputStream os = session.write(output);
+             RecordSetWriter writer = writerFactory.createWriter(getLogger(), 
writerFactory.getSchema(input.getAttributes(), reader.getSchema()), os, output);
+             OutputStream osNotFound = routingStrategy == 
RoutingStrategy.SPLIT ? session.write(notFound) : null;
+             RecordSetWriter notFoundWriter = routingStrategy == 
RoutingStrategy.SPLIT ? writerFactory.createWriter(getLogger(), 
reader.getSchema(), osNotFound, notFound) : null) {

Review comment:
       Recommend adding `final` to these declarations:
   ```suggestion
           try (final InputStream is = session.read(input);
                final RecordReader reader = 
readerFactory.createRecordReader(input, is, getLogger());
                final OutputStream os = session.write(output);
                final RecordSetWriter writer = 
writerFactory.createWriter(getLogger(), 
writerFactory.getSchema(input.getAttributes(), reader.getSchema()), os, output);
                final OutputStream osNotFound = routingStrategy == 
RoutingStrategy.SPLIT ? session.write(notFound) : null;
                final RecordSetWriter notFoundWriter = routingStrategy == 
RoutingStrategy.SPLIT ? writerFactory.createWriter(getLogger(), 
reader.getSchema(), osNotFound, notFound) : null) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/test/java/org/apache/nifi/processors/geohash/GeohashRecordTest.java
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.serialization.record.MockSchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GeohashRecordTest {
+
+    private final TestRunner runner = 
TestRunners.newTestRunner(GeohashRecord.class);
+
+    @BeforeEach
+    public void setUp() throws InitializationException {
+        ControllerService reader = new JsonTreeReader();
+        ControllerService writer = new JsonRecordSetWriter();
+        ControllerService registry = new MockSchemaRegistry();
+        runner.addControllerService("reader", reader);
+        runner.addControllerService("writer", writer);
+        runner.addControllerService("registry", registry);
+
+        try (InputStream is = 
getClass().getResourceAsStream("/record_schema.avsc")) {
+            String raw = IOUtils.toString(is, "UTF-8");
+            RecordSchema parsed = AvroTypeUtil.createSchema(new 
Schema.Parser().parse(raw));
+            ((MockSchemaRegistry) registry).addSchema("record", parsed);
+
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+
+        runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, 
"registry");
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, 
SchemaAccessUtils.SCHEMA_NAME_PROPERTY);
+        runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, 
"registry");
+
+        runner.setProperty(GeohashRecord.RECORD_READER, "reader");
+        runner.setProperty(GeohashRecord.RECORD_WRITER, "writer");
+        runner.enableControllerService(registry);
+        runner.enableControllerService(reader);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(GeohashRecord.LATITUDE_RECORD_PATH, "/latitude");
+        runner.setProperty(GeohashRecord.LONGITUDE_RECORD_PATH, "/longitude");
+        runner.setProperty(GeohashRecord.GEOHASH_RECORD_PATH, "/geohash");
+        runner.setProperty(GeohashRecord.GEOHASH_FORMAT, 
GeohashRecord.GeohashFormat.BASE_32.toString());
+        runner.setProperty(GeohashRecord.GEOHASH_LEVEL, "12");
+    }
+
+    private void commonTest(String path, int failure, int success, int 
original) {

Review comment:
       For clarity of implementation, recommend renaming this to something like 
`run()` or `assertTransfers()`

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.WriteResult;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes 
Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {

Review comment:
       An additional `WritesAttributes` annotation should be added to indicate 
the attributes that can be written during processing, such as `record.count`.

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.WriteResult;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes 
Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+
+    public enum ProcessingMode {
+        ENCODE, DECODE
+    }
+
+    public enum GeohashFormat {
+        BASE_32, BINARY_STRING, LONG
+    }
+
+    public enum RoutingStrategy {
+        SKIP_UNENRICHED,
+        SPLIT,
+        REQUIRE_ALL_ENRICHED
+    }
+
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to 
geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ProcessingMode.values())
+            .defaultValue(ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor ROUTING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("routing-strategy")
+            .displayName("Routing Strategy")
+            .description("Specifies how to route records after encoding or 
decoding has been performed. "
+                    + "Skip_Unenriched will route a flowfile to success if any 
of its records is enriched, otherwise it will be sent to failure. "
+                    + "Split_Enriched_Unenriched will separate the records 
that have been enriched from those have not and send them to success, while 
unenriched records will be sent to failure; "
+                    + "and the original flowfile will be sent to the original 
relationship. "
+                    + "Require_All_Enriched will route a flowfile to success 
only if all of its records are enriched, otherwise it will be sent to failure")
+            .required(true)
+            .allowableValues(RoutingStrategy.values())
+            .defaultValue(RoutingStrategy.SKIP_UNENRICHED.name())
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for 
reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for 
writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the latitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the latitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the longitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the longitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the 
desired format for encoding geohash; "
+                    + "in the decode mode, this property specifies the format 
of geohash provided")
+            .required(true)
+            .allowableValues(GeohashFormat.values())
+            .defaultValue(GeohashFormat.BASE_32.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new 
PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for 
encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1, 12, true))
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(MODE, ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the 
record path to put the geohash value; "
+                    + "in the decode mode, this property specifies the record 
path to retrieve the geohash value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded 
will be routed to success")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be 
routed to failure")
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("With the Split_Enriched_Unenriched strategy, the 
original input flowfile will be sent to this relationship regardless of whether 
it was enriched or not.")
+            .build();
+
+    private static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH, LONGITUDE_RECORD_PATH, GEOHASH_RECORD_PATH
+    ));
+
+    private final RecordPathCache cache = new RecordPathCache(100);
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(ROUTING_STRATEGY);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = 
context.getProperty(MODE).getValue().equalsIgnoreCase(ProcessingMode.ENCODE.toString());
+        final RoutingStrategy routingStrategy = 
RoutingStrategy.valueOf(context.getProperty(ROUTING_STRATEGY).getValue());
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        FlowFile notFound = routingStrategy == RoutingStrategy.SPLIT ? 
session.create(input) : null;
+
+        try (InputStream is = session.read(input);
+             RecordReader reader = readerFactory.createRecordReader(input, is, 
getLogger());
+             OutputStream os = session.write(output);
+             RecordSetWriter writer = writerFactory.createWriter(getLogger(), 
writerFactory.getSchema(input.getAttributes(), reader.getSchema()), os, output);
+             OutputStream osNotFound = routingStrategy == 
RoutingStrategy.SPLIT ? session.write(notFound) : null;
+             RecordSetWriter notFoundWriter = routingStrategy == 
RoutingStrategy.SPLIT ? writerFactory.createWriter(getLogger(), 
reader.getSchema(), osNotFound, notFound) : null) {
+
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = 
context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            Record record;
+
+            //The overall relationship used by the Require_All_Enriched and 
Skip_Unenriched routing strategies to transfer Flowfiles.
+            //For the Split_Enriched_Unenriched strategy, the transfer of 
Flowfiles does not rely on this overall relationship. Instead, each individual 
record will be sent to success or failure.
+            Relationship targetRelationship = routingStrategy == 
RoutingStrategy.REQUIRE_ALL_ENRICHED ? REL_SUCCESS : REL_FAILURE;
+
+            writer.beginRecordSet();
+
+            if (notFoundWriter != null) {
+                notFoundWriter.beginRecordSet();
+            }
+
+            int foundCount = 0;
+            int notFoundCount = 0;
+
+            int level = context.getProperty(GEOHASH_LEVEL).asInteger();
+            final String rawLatitudePath = 
context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+            RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+            final String rawLongitudePath = 
context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+            RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+            final String rawgeohashPath = 
context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+            RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+            while ((record = reader.nextRecord()) != null) {
+                boolean updated = false;
+                if (encode) {
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, 
longitudePath, record, format, level);
+                    updated = updateRecord(GEOHASH_RECORD_PATH, 
encodedGeohash, record, paths);
+                } else {
+                    WGS84Point decodedPoint = 
getDecodedPointFromGeohash(geohashPath, record, format);
+                    if (decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, 
String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                && updateRecord(LONGITUDE_RECORD_PATH, 
String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (routingStrategy == RoutingStrategy.REQUIRE_ALL_ENRICHED) {
+                    if (!updated) { //If the routing strategy is 
Require_All_Enriched and there exists a record that is not updated, the entire 
flowfile should be route to REL_FAILURE
+                        targetRelationship = REL_FAILURE;
+                    }
+                } else {
+                    if (updated) {
+                        //If the routing strategy is Skip_Unenriched and there 
exists a record that is updated, the entire flowfile should be route to 
REL_SUCCESS
+                        //If the routing strategy is Split_Enriched_Unenriched 
and there exists a record that is updated, this record should be route to 
REL_SUCCESS
+                        targetRelationship = REL_SUCCESS;
+                    }
+                }
+
+                if (routingStrategy != RoutingStrategy.SPLIT || updated) {
+                    writer.write(record);
+                    foundCount++;
+                } else { //if the routing strategy is 
Split_Enriched_Unenriched and the record is not updated
+                    notFoundWriter.write(record);
+                    notFoundCount++;
+                }
+            }
+
+            final WriteResult writeResult = writer.finishRecordSet();
+            writer.close();
+
+            WriteResult notFoundWriterResult = null;
+
+            if (notFoundWriter != null) {
+                notFoundWriterResult = notFoundWriter.finishRecordSet();
+                notFoundWriter.close();
+            }
+
+            is.close();
+            os.close();
+
+            if (osNotFound != null) {
+                osNotFound.close();
+            }

Review comment:
       Can those `close()` method calls be removed since the streams are 
declared in the enclosing `try` block and should be closed automatically?

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.WriteResult;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes 
Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+
+    public enum ProcessingMode {
+        ENCODE, DECODE
+    }
+
+    public enum GeohashFormat {
+        BASE_32, BINARY_STRING, LONG
+    }
+
+    public enum RoutingStrategy {
+        SKIP_UNENRICHED,
+        SPLIT,
+        REQUIRE_ALL_ENRICHED
+    }
+
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to 
geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ProcessingMode.values())
+            .defaultValue(ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor ROUTING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("routing-strategy")
+            .displayName("Routing Strategy")
+            .description("Specifies how to route records after encoding or 
decoding has been performed. "
+                    + "Skip_Unenriched will route a flowfile to success if any 
of its records is enriched, otherwise it will be sent to failure. "
+                    + "Split_Enriched_Unenriched will separate the records 
that have been enriched from those have not and send them to success, while 
unenriched records will be sent to failure; "
+                    + "and the original flowfile will be sent to the original 
relationship. "
+                    + "Require_All_Enriched will route a flowfile to success 
only if all of its records are enriched, otherwise it will be sent to failure")
+            .required(true)
+            .allowableValues(RoutingStrategy.values())
+            .defaultValue(RoutingStrategy.SKIP_UNENRICHED.name())
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for 
reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for 
writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the latitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the latitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the longitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the longitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the 
desired format for encoding geohash; "
+                    + "in the decode mode, this property specifies the format 
of geohash provided")
+            .required(true)
+            .allowableValues(GeohashFormat.values())
+            .defaultValue(GeohashFormat.BASE_32.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new 
PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for 
encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1, 12, true))
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(MODE, ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the 
record path to put the geohash value; "
+                    + "in the decode mode, this property specifies the record 
path to retrieve the geohash value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded 
will be routed to success")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be 
routed to failure")
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("With the Split_Enriched_Unenriched strategy, the 
original input flowfile will be sent to this relationship regardless of whether 
it was enriched or not.")
+            .build();
+
+    private static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH, LONGITUDE_RECORD_PATH, GEOHASH_RECORD_PATH
+    ));
+
+    private final RecordPathCache cache = new RecordPathCache(100);
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(ROUTING_STRATEGY);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = 
context.getProperty(MODE).getValue().equalsIgnoreCase(ProcessingMode.ENCODE.toString());
+        final RoutingStrategy routingStrategy = 
RoutingStrategy.valueOf(context.getProperty(ROUTING_STRATEGY).getValue());

Review comment:
       The Property Descriptors indicate that these properties support 
expression language, but `evaluateAttributeExpression()` is not called.

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.WriteResult;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes 
Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+
+    public enum ProcessingMode {
+        ENCODE, DECODE
+    }
+
+    public enum GeohashFormat {
+        BASE_32, BINARY_STRING, LONG
+    }
+
+    public enum RoutingStrategy {
+        SKIP_UNENRICHED,
+        SPLIT,
+        REQUIRE_ALL_ENRICHED
+    }
+
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to 
geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ProcessingMode.values())
+            .defaultValue(ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor ROUTING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("routing-strategy")
+            .displayName("Routing Strategy")
+            .description("Specifies how to route records after encoding or 
decoding has been performed. "
+                    + "Skip_Unenriched will route a flowfile to success if any 
of its records is enriched, otherwise it will be sent to failure. "
+                    + "Split_Enriched_Unenriched will separate the records 
that have been enriched from those have not and send them to success, while 
unenriched records will be sent to failure; "
+                    + "and the original flowfile will be sent to the original 
relationship. "
+                    + "Require_All_Enriched will route a flowfile to success 
only if all of its records are enriched, otherwise it will be sent to failure")
+            .required(true)
+            .allowableValues(RoutingStrategy.values())
+            .defaultValue(RoutingStrategy.SKIP_UNENRICHED.name())
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for 
reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for 
writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the latitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the latitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the longitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the longitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the 
desired format for encoding geohash; "
+                    + "in the decode mode, this property specifies the format 
of geohash provided")
+            .required(true)
+            .allowableValues(GeohashFormat.values())
+            .defaultValue(GeohashFormat.BASE_32.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new 
PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for 
encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1, 12, true))
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(MODE, ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the 
record path to put the geohash value; "
+                    + "in the decode mode, this property specifies the record 
path to retrieve the geohash value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded 
will be routed to success")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be 
routed to failure")
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("With the Split_Enriched_Unenriched strategy, the 
original input flowfile will be sent to this relationship regardless of whether 
it was enriched or not.")
+            .build();
+
+    private static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH, LONGITUDE_RECORD_PATH, GEOHASH_RECORD_PATH
+    ));
+
+    private final RecordPathCache cache = new RecordPathCache(100);
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(ROUTING_STRATEGY);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = 
context.getProperty(MODE).getValue().equalsIgnoreCase(ProcessingMode.ENCODE.toString());
+        final RoutingStrategy routingStrategy = 
RoutingStrategy.valueOf(context.getProperty(ROUTING_STRATEGY).getValue());
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        FlowFile notFound = routingStrategy == RoutingStrategy.SPLIT ? 
session.create(input) : null;
+
+        try (InputStream is = session.read(input);
+             RecordReader reader = readerFactory.createRecordReader(input, is, 
getLogger());
+             OutputStream os = session.write(output);
+             RecordSetWriter writer = writerFactory.createWriter(getLogger(), 
writerFactory.getSchema(input.getAttributes(), reader.getSchema()), os, output);
+             OutputStream osNotFound = routingStrategy == 
RoutingStrategy.SPLIT ? session.write(notFound) : null;
+             RecordSetWriter notFoundWriter = routingStrategy == 
RoutingStrategy.SPLIT ? writerFactory.createWriter(getLogger(), 
reader.getSchema(), osNotFound, notFound) : null) {
+
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = 
context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            Record record;
+
+            //The overall relationship used by the Require_All_Enriched and 
Skip_Unenriched routing strategies to transfer Flowfiles.
+            //For the Split_Enriched_Unenriched strategy, the transfer of 
Flowfiles does not rely on this overall relationship. Instead, each individual 
record will be sent to success or failure.
+            Relationship targetRelationship = routingStrategy == 
RoutingStrategy.REQUIRE_ALL_ENRICHED ? REL_SUCCESS : REL_FAILURE;
+
+            writer.beginRecordSet();
+
+            if (notFoundWriter != null) {
+                notFoundWriter.beginRecordSet();
+            }
+
+            int foundCount = 0;
+            int notFoundCount = 0;
+
+            int level = context.getProperty(GEOHASH_LEVEL).asInteger();
+            final String rawLatitudePath = 
context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+            RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+            final String rawLongitudePath = 
context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+            RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+            final String rawgeohashPath = 
context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+            RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+            while ((record = reader.nextRecord()) != null) {
+                boolean updated = false;
+                if (encode) {
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, 
longitudePath, record, format, level);
+                    updated = updateRecord(GEOHASH_RECORD_PATH, 
encodedGeohash, record, paths);
+                } else {
+                    WGS84Point decodedPoint = 
getDecodedPointFromGeohash(geohashPath, record, format);
+                    if (decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, 
String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                && updateRecord(LONGITUDE_RECORD_PATH, 
String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (routingStrategy == RoutingStrategy.REQUIRE_ALL_ENRICHED) {
+                    if (!updated) { //If the routing strategy is 
Require_All_Enriched and there exists a record that is not updated, the entire 
flowfile should be route to REL_FAILURE
+                        targetRelationship = REL_FAILURE;
+                    }
+                } else {
+                    if (updated) {
+                        //If the routing strategy is Skip_Unenriched and there 
exists a record that is updated, the entire flowfile should be route to 
REL_SUCCESS
+                        //If the routing strategy is Split_Enriched_Unenriched 
and there exists a record that is updated, this record should be route to 
REL_SUCCESS
+                        targetRelationship = REL_SUCCESS;
+                    }
+                }
+
+                if (routingStrategy != RoutingStrategy.SPLIT || updated) {
+                    writer.write(record);
+                    foundCount++;
+                } else { //if the routing strategy is 
Split_Enriched_Unenriched and the record is not updated
+                    notFoundWriter.write(record);
+                    notFoundCount++;
+                }
+            }
+
+            final WriteResult writeResult = writer.finishRecordSet();
+            writer.close();
+
+            WriteResult notFoundWriterResult = null;
+
+            if (notFoundWriter != null) {
+                notFoundWriterResult = notFoundWriter.finishRecordSet();
+                notFoundWriter.close();
+            }
+
+            is.close();
+            os.close();
+
+            if (osNotFound != null) {
+                osNotFound.close();
+            }
+
+            output = session.putAllAttributes(output, 
buildAttributes(foundCount, writer.getMimeType(), writeResult));
+            if (routingStrategy != RoutingStrategy.SPLIT) {
+                session.transfer(output, targetRelationship);
+                session.remove(input);
+            } else {
+                if (notFoundCount > 0) {
+                    notFound = session.putAllAttributes(notFound, 
buildAttributes(notFoundCount, writer.getMimeType(), notFoundWriterResult));
+                    session.transfer(notFound, REL_FAILURE);
+                } else {
+                    session.remove(notFound);
+                }
+                session.transfer(output, REL_SUCCESS);
+                session.transfer(input, REL_ORIGINAL);
+            }
+
+        } catch (Exception ex) {
+            getLogger().error("Failed to {} due to {}", encode ? "encode" : 
"decode", ex.getLocalizedMessage(), ex);
+        }
+    }
+
+
+    private Object getEncodedGeohash(RecordPath latitudePath, RecordPath 
longitudePath, Record record, String format, int level) {
+        RecordPathResult latitudeResult = latitudePath.evaluate(record);
+        RecordPathResult longitudeResult = longitudePath.evaluate(record);
+        Optional<FieldValue> latitudeField = 
latitudeResult.getSelectedFields().findFirst();
+        Optional<FieldValue> longitudeField = 
longitudeResult.getSelectedFields().findFirst();
+
+        if (!latitudeField.isPresent() || !longitudeField.isPresent()) {
+            return null;
+        }
+
+        FieldValue latitudeValue = latitudeField.get();
+        FieldValue longitudeValue = longitudeField.get();
+        Object latitudeVal = latitudeValue.getValue();
+        Object longitudeVal = longitudeValue.getValue();
+
+        if (latitudeVal == null || longitudeVal == null) {
+            return null;
+        }
+
+        double realLatValue = Double.parseDouble(latitudeVal.toString());
+        double realLongValue = Double.parseDouble(longitudeVal.toString());
+        GeoHash gh = GeoHash.withCharacterPrecision(realLatValue, 
realLongValue, level);
+
+        switch (GeohashFormat.valueOf(format)) {
+            case BINARY_STRING:
+                return gh.toBinaryString();
+            case LONG:
+                return gh.longValue();
+            default:
+                return gh.toBase32();
+        }
+    }
+
+    private WGS84Point getDecodedPointFromGeohash(RecordPath geohashPath, 
Record record, String format) {

Review comment:
       As mentioned above, recommend using `GeohashFormat`:
   ```suggestion
       private WGS84Point getDecodedPointFromGeohash(RecordPath geohashPath, 
Record record, GeohashFormat format) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/java/org/apache/nifi/processors/geohash/GeohashRecord.java
##########
@@ -0,0 +1,449 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.geohash;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.record.path.FieldValue;
+import org.apache.nifi.record.path.RecordPath;
+import org.apache.nifi.record.path.RecordPathResult;
+import org.apache.nifi.record.path.util.RecordPathCache;
+import org.apache.nifi.record.path.validation.RecordPathValidator;
+
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.WriteResult;
+
+import ch.hsr.geohash.GeoHash;
+import ch.hsr.geohash.WGS84Point;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Optional;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"geo", "geohash", "record"})
+@CapabilityDescription("A record-based processor that encodes and decodes 
Geohashes from and to latitude/longitude coordinates.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+public class GeohashRecord extends AbstractProcessor {
+
+    public enum ProcessingMode {
+        ENCODE, DECODE
+    }
+
+    public enum GeohashFormat {
+        BASE_32, BINARY_STRING, LONG
+    }
+
+    public enum RoutingStrategy {
+        SKIP_UNENRICHED,
+        SPLIT,
+        REQUIRE_ALL_ENRICHED
+    }
+
+    public static final PropertyDescriptor MODE = new 
PropertyDescriptor.Builder()
+            .name("mode")
+            .displayName("Mode")
+            .description("Specifies whether to encode latitude/longitude to 
geohash or decode geohash to latitude/longitude")
+            .required(true)
+            .allowableValues(ProcessingMode.values())
+            .defaultValue(ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor ROUTING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("routing-strategy")
+            .displayName("Routing Strategy")
+            .description("Specifies how to route records after encoding or 
decoding has been performed. "
+                    + "Skip_Unenriched will route a flowfile to success if any 
of its records is enriched, otherwise it will be sent to failure. "
+                    + "Split_Enriched_Unenriched will separate the records 
that have been enriched from those have not and send them to success, while 
unenriched records will be sent to failure; "
+                    + "and the original flowfile will be sent to the original 
relationship. "
+                    + "Require_All_Enriched will route a flowfile to success 
only if all of its records are enriched, otherwise it will be sent to failure")
+            .required(true)
+            .allowableValues(RoutingStrategy.values())
+            .defaultValue(RoutingStrategy.SKIP_UNENRICHED.name())
+            .build();
+
+    public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+            .name("record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the record reader service to use for 
reading incoming data")
+            .required(true)
+            .identifiesControllerService(RecordReaderFactory.class)
+            .build();
+
+    public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the record writer service to use for 
writing data")
+            .required(true)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
+
+    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("latitude-record-path")
+            .displayName("Latitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the latitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the latitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("longitude-record-path")
+            .displayName("Longitude Record Path")
+            .description("In the encode mode, this property specifies the 
record path to retrieve the longitude value; "
+                    + "in the decode mode, this property specifies the record 
path to put the longitude value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_FORMAT = new 
PropertyDescriptor.Builder()
+            .name("geohash-format")
+            .displayName("Geohash Format")
+            .description("In the encode mode, this property specifies the 
desired format for encoding geohash; "
+                    + "in the decode mode, this property specifies the format 
of geohash provided")
+            .required(true)
+            .allowableValues(GeohashFormat.values())
+            .defaultValue(GeohashFormat.BASE_32.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_LEVEL = new 
PropertyDescriptor.Builder()
+            .name("geohash-level")
+            .displayName("Geohash Level")
+            .description("The integer precision level(1-12) desired for 
encoding geohash")
+            .required(true)
+            .addValidator(StandardValidators.createLongValidator(1, 12, true))
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .dependsOn(MODE, ProcessingMode.ENCODE.name())
+            .build();
+
+    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new 
PropertyDescriptor.Builder()
+            .name("geohash-record-path")
+            .displayName("Geohash Record Path")
+            .description("In the encode mode, this property specifies the 
record path to put the geohash value; "
+                    + "in the decode mode, this property specifies the record 
path to retrieve the geohash value")
+            .required(true)
+            .addValidator(new RecordPathValidator())
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Records that are successfully encoded or decoded 
will be routed to success")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Records that cannot be encoded or decoded will be 
routed to failure")
+            .build();
+
+    public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+            .name("original")
+            .description("With the Split_Enriched_Unenriched strategy, the 
original input flowfile will be sent to this relationship regardless of whether 
it was enriched or not.")
+            .build();
+
+    private static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
+            LATITUDE_RECORD_PATH, LONGITUDE_RECORD_PATH, GEOHASH_RECORD_PATH
+    ));
+
+    private final RecordPathCache cache = new RecordPathCache(100);
+
+    private List<PropertyDescriptor> descriptors;
+
+    private Set<Relationship> relationships;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        descriptors = new ArrayList<>();
+        descriptors.add(MODE);
+        descriptors.add(RECORD_READER);
+        descriptors.add(RECORD_WRITER);
+        descriptors.add(ROUTING_STRATEGY);
+        descriptors.add(LATITUDE_RECORD_PATH);
+        descriptors.add(LONGITUDE_RECORD_PATH);
+        descriptors.add(GEOHASH_RECORD_PATH);
+        descriptors.add(GEOHASH_FORMAT);
+        descriptors.add(GEOHASH_LEVEL);
+        descriptors = Collections.unmodifiableList(descriptors);
+
+        relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_ORIGINAL);
+        relationships = Collections.unmodifiableSet(relationships);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
+
+    @Override
+    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile input = session.get();
+        if (input == null) {
+            return;
+        }
+
+        final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final boolean encode = 
context.getProperty(MODE).getValue().equalsIgnoreCase(ProcessingMode.ENCODE.toString());
+        final RoutingStrategy routingStrategy = 
RoutingStrategy.valueOf(context.getProperty(ROUTING_STRATEGY).getValue());
+        final String format = context.getProperty(GEOHASH_FORMAT).getValue();
+
+        FlowFile output = session.create(input);
+        FlowFile notFound = routingStrategy == RoutingStrategy.SPLIT ? 
session.create(input) : null;
+
+        try (InputStream is = session.read(input);
+             RecordReader reader = readerFactory.createRecordReader(input, is, 
getLogger());
+             OutputStream os = session.write(output);
+             RecordSetWriter writer = writerFactory.createWriter(getLogger(), 
writerFactory.getSchema(input.getAttributes(), reader.getSchema()), os, output);
+             OutputStream osNotFound = routingStrategy == 
RoutingStrategy.SPLIT ? session.write(notFound) : null;
+             RecordSetWriter notFoundWriter = routingStrategy == 
RoutingStrategy.SPLIT ? writerFactory.createWriter(getLogger(), 
reader.getSchema(), osNotFound, notFound) : null) {
+
+            Map<PropertyDescriptor, RecordPath> paths = new HashMap<>();
+            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
+                String rawRecordPath = 
context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
+                RecordPath compiled = cache.getCompiled(rawRecordPath);
+                paths.put(descriptor, compiled);
+            }
+
+            Record record;
+
+            //The overall relationship used by the Require_All_Enriched and 
Skip_Unenriched routing strategies to transfer Flowfiles.
+            //For the Split_Enriched_Unenriched strategy, the transfer of 
Flowfiles does not rely on this overall relationship. Instead, each individual 
record will be sent to success or failure.
+            Relationship targetRelationship = routingStrategy == 
RoutingStrategy.REQUIRE_ALL_ENRICHED ? REL_SUCCESS : REL_FAILURE;
+
+            writer.beginRecordSet();
+
+            if (notFoundWriter != null) {
+                notFoundWriter.beginRecordSet();
+            }
+
+            int foundCount = 0;
+            int notFoundCount = 0;
+
+            int level = context.getProperty(GEOHASH_LEVEL).asInteger();
+            final String rawLatitudePath = 
context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+            RecordPath latitudePath = cache.getCompiled(rawLatitudePath);
+            final String rawLongitudePath = 
context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+            RecordPath longitudePath = cache.getCompiled(rawLongitudePath);
+            final String rawgeohashPath = 
context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
+            RecordPath geohashPath = cache.getCompiled(rawgeohashPath);
+
+            while ((record = reader.nextRecord()) != null) {
+                boolean updated = false;
+                if (encode) {
+                    Object encodedGeohash = getEncodedGeohash(latitudePath, 
longitudePath, record, format, level);
+                    updated = updateRecord(GEOHASH_RECORD_PATH, 
encodedGeohash, record, paths);
+                } else {
+                    WGS84Point decodedPoint = 
getDecodedPointFromGeohash(geohashPath, record, format);
+                    if (decodedPoint != null) {
+                        updated = updateRecord(LATITUDE_RECORD_PATH, 
String.valueOf(decodedPoint.getLatitude()), record, paths)
+                                && updateRecord(LONGITUDE_RECORD_PATH, 
String.valueOf(decodedPoint.getLongitude()), record, paths);
+                    }
+                }
+
+                if (routingStrategy == RoutingStrategy.REQUIRE_ALL_ENRICHED) {
+                    if (!updated) { //If the routing strategy is 
Require_All_Enriched and there exists a record that is not updated, the entire 
flowfile should be route to REL_FAILURE
+                        targetRelationship = REL_FAILURE;
+                    }
+                } else {
+                    if (updated) {
+                        //If the routing strategy is Skip_Unenriched and there 
exists a record that is updated, the entire flowfile should be route to 
REL_SUCCESS
+                        //If the routing strategy is Split_Enriched_Unenriched 
and there exists a record that is updated, this record should be route to 
REL_SUCCESS
+                        targetRelationship = REL_SUCCESS;
+                    }
+                }
+
+                if (routingStrategy != RoutingStrategy.SPLIT || updated) {
+                    writer.write(record);
+                    foundCount++;
+                } else { //if the routing strategy is 
Split_Enriched_Unenriched and the record is not updated
+                    notFoundWriter.write(record);
+                    notFoundCount++;
+                }
+            }
+
+            final WriteResult writeResult = writer.finishRecordSet();
+            writer.close();
+
+            WriteResult notFoundWriterResult = null;
+
+            if (notFoundWriter != null) {
+                notFoundWriterResult = notFoundWriter.finishRecordSet();
+                notFoundWriter.close();
+            }
+
+            is.close();
+            os.close();
+
+            if (osNotFound != null) {
+                osNotFound.close();
+            }
+
+            output = session.putAllAttributes(output, 
buildAttributes(foundCount, writer.getMimeType(), writeResult));
+            if (routingStrategy != RoutingStrategy.SPLIT) {
+                session.transfer(output, targetRelationship);
+                session.remove(input);
+            } else {
+                if (notFoundCount > 0) {
+                    notFound = session.putAllAttributes(notFound, 
buildAttributes(notFoundCount, writer.getMimeType(), notFoundWriterResult));
+                    session.transfer(notFound, REL_FAILURE);
+                } else {
+                    session.remove(notFound);
+                }
+                session.transfer(output, REL_SUCCESS);
+                session.transfer(input, REL_ORIGINAL);
+            }
+
+        } catch (Exception ex) {
+            getLogger().error("Failed to {} due to {}", encode ? "encode" : 
"decode", ex.getLocalizedMessage(), ex);
+        }
+    }
+
+
+    private Object getEncodedGeohash(RecordPath latitudePath, RecordPath 
longitudePath, Record record, String format, int level) {

Review comment:
       Recommend changing `format` to `GeohashFormat` instead of `String` to 
avoid calling `GeohashFormat.valueOf()` for each record evaluation.
   ```suggestion
       private Object getEncodedGeohash(RecordPath latitudePath, RecordPath 
longitudePath, Record record, GeohashFormat format, int level) {
   ```

##########
File path: 
nifi-nar-bundles/nifi-geohash-bundle/nifi-geohash-processors/src/main/resources/docs/org.apache.nifi.processors.geohash.GeohashRecord/additionalDetails.html
##########
@@ -0,0 +1,55 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8"/>
+    <title>GeohashRecord</title>
+    <link rel="stylesheet" href="../../../../../css/component-usage.css" 
type="text/css"/>
+</head>
+
+<body>
+<h1>Overview</h1>
+<p>
+    A Geohash value corresponds to an specific area with pre-defined 
granularity, and is widely used in identifying,
+    representing, and indexing geospatial objects.
+    This GeohashRecord processor provides the ability to encode and decode 
Geohashes with desired format and precision.
+</p>
+<h3>Formats supported</h3>
+<p>
+<ul>
+    <li>Base32 string: The most commonly used alphanumeric version. It is 
compact and more human-readale by discarding
+        some letters(such as "a" and "o", "i" and "l") that might cause 
confusion.
+    </li>
+    <li>Binary String: This format is generated by directly interleaving 
latitude and longitude binary string. The even
+        bits in the binary strings corresponds to the longitude, while the odd 
digits corresponds to the latitude.
+    </li>
+    <li>Long Integer: Although this 64-bit integer format is not 
human-readale, it can be calculated very fast and is

Review comment:
       ```suggestion
       <li>Long: Although this 64-bit number format is not human-readale, it 
can be calculated very fast and is
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to