This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 68228921a4 NIFI-12664 Removed deprecated DMC in GetHBase
68228921a4 is described below
commit 68228921a43c506e92a0476f550951e164dfb90e
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Jan 25 09:22:41 2024 +0400
NIFI-12664 Removed deprecated DMC in GetHBase
This closes #8301
Signed-off-by: David Handermann <[email protected]>
---
.../main/java/org/apache/nifi/hbase/GetHBase.java | 125 ++------------------
.../java/org/apache/nifi/hbase/TestGetHBase.java | 128 +--------------------
2 files changed, 8 insertions(+), 245 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
index 5777a67861..8b2cf0d973 100644
---
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
+++
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/GetHBase.java
@@ -26,25 +26,20 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
-import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hbase.io.JsonRowSerializer;
import org.apache.nifi.hbase.io.RowSerializer;
import org.apache.nifi.hbase.scan.Column;
import org.apache.nifi.hbase.scan.ResultCell;
-import org.apache.nifi.hbase.util.ObjectSerDe;
-import org.apache.nifi.hbase.util.StringSerDe;
+import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -52,11 +47,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
@@ -103,13 +94,6 @@ public class GetHBase extends AbstractProcessor implements
VisibilityFetchSuppor
.required(true)
.identifiesControllerService(HBaseClientService.class)
.build();
- static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new
PropertyDescriptor.Builder()
- .name("Distributed Cache Service")
- .description("Specifies the Controller Service that should be used
to maintain state about what has been pulled from HBase" +
- " so that if a new node begins pulling data, it won't
duplicate all of the work that has been done.")
- .required(false)
- .identifiesControllerService(DistributedMapCacheClient.class)
- .build();
static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("Specifies which character set is used to encode the
data in HBase")
@@ -157,7 +141,6 @@ public class GetHBase extends AbstractProcessor implements
VisibilityFetchSuppor
private final AtomicReference<ScanResult> lastResult = new
AtomicReference<>();
private volatile List<Column> columns = new ArrayList<>();
- private volatile boolean justElectedPrimaryNode = false;
private volatile String previousTable = null;
@Override
@@ -169,7 +152,6 @@ public class GetHBase extends AbstractProcessor implements
VisibilityFetchSuppor
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HBASE_CLIENT_SERVICE);
- properties.add(DISTRIBUTED_CACHE_SERVICE);
properties.add(TABLE_NAME);
properties.add(COLUMNS);
properties.add(AUTHORIZATIONS);
@@ -204,21 +186,14 @@ public class GetHBase extends AbstractProcessor
implements VisibilityFetchSuppor
}
}
+ @Override
+ public void migrateProperties(PropertyConfiguration config) {
+ super.migrateProperties(config);
+ config.removeProperty("Distributed Cache Service");
+ }
+
@OnScheduled
public void parseColumns(final ProcessContext context) throws IOException {
- final StateMap stateMap =
context.getStateManager().getState(Scope.CLUSTER);
- if (!stateMap.getStateVersion().isPresent()) {
- // no state has been stored in the State Manager - check if we
have state stored in the
- // DistributedMapCacheClient service and migrate it if so
- final DistributedMapCacheClient client =
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
- final ScanResult scanResult = getState(client);
- if (scanResult != null) {
- context.getStateManager().setState(scanResult.toFlatMap(),
Scope.CLUSTER);
- }
-
- clearState(client);
- }
-
final String columnsValue =
context.getProperty(COLUMNS).evaluateAttributeExpressions().getValue();
final String[] columns = (columnsValue == null ||
columnsValue.isEmpty() ? new String[0] : columnsValue.split(","));
@@ -236,19 +211,6 @@ public class GetHBase extends AbstractProcessor implements
VisibilityFetchSuppor
}
}
- @OnPrimaryNodeStateChange
- public void onPrimaryNodeChange(final PrimaryNodeState newState) {
- justElectedPrimaryNode = (newState ==
PrimaryNodeState.ELECTED_PRIMARY_NODE);
- }
-
- @OnRemoved
- public void onRemoved(final ProcessContext context) {
- final DistributedMapCacheClient client =
context.getProperty(DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
- if (client != null) {
- clearState(client);
- }
- }
-
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
@@ -427,91 +389,18 @@ public class GetHBase extends AbstractProcessor
implements VisibilityFetchSuppor
return 500;
}
- protected File getStateDir() {
- return new File("conf/state");
- }
-
- protected File getStateFile() {
- return new File(getStateDir(), "getHBase-" + getIdentifier());
- }
-
- protected String getKey() {
- return "getHBase-" + getIdentifier() + "-state";
- }
-
protected List<Column> getColumns() {
return columns;
}
- private void clearState(final DistributedMapCacheClient client) {
- final File localState = getStateFile();
- if (localState.exists()) {
- localState.delete();
- }
-
- if (client != null) {
- try {
- client.remove(getKey(), new StringSerDe());
- } catch (IOException e) {
- getLogger().warn("Processor state was not cleared from
distributed cache due to {}", new Object[]{e});
- }
- }
- }
-
-
private ScanResult getState(final ProcessSession session) throws
IOException {
final StateMap stateMap = session.getState(Scope.CLUSTER);
if (!stateMap.getStateVersion().isPresent()) {
return null;
}
-
return ScanResult.fromFlatMap(stateMap.toMap());
}
- private ScanResult getState(final DistributedMapCacheClient client) throws
IOException {
- final StringSerDe stringSerDe = new StringSerDe();
- final ObjectSerDe objectSerDe = new ObjectSerDe();
-
- ScanResult scanResult = lastResult.get();
- // if we have no previous result, or we just became primary, pull from
distributed cache
- if (scanResult == null || justElectedPrimaryNode) {
- if (client != null) {
- final Object obj = client.get(getKey(), stringSerDe,
objectSerDe);
- if (obj == null || !(obj instanceof ScanResult)) {
- scanResult = null;
- } else {
- scanResult = (ScanResult) obj;
- getLogger().debug("Retrieved state from the distributed
cache, previous timestamp was {}", new Object[] {scanResult.getTimestamp()});
- }
- }
-
- // no requirement to pull an update from the distributed cache
anymore.
- justElectedPrimaryNode = false;
- }
-
- // Check the persistence file. We want to use the latest timestamp
that we have so that
- // we don't duplicate data.
- final File file = getStateFile();
- if (file.exists()) {
- try (final InputStream fis = new FileInputStream(file);
- final ObjectInputStream ois = new ObjectInputStream(fis)) {
-
- final Object obj = ois.readObject();
- if (obj != null && (obj instanceof ScanResult)) {
- final ScanResult localScanResult = (ScanResult) obj;
- if (scanResult == null || localScanResult.getTimestamp() >
scanResult.getTimestamp()) {
- scanResult = localScanResult;
- getLogger().debug("Using last timestamp from local
state because it was newer than the distributed cache, or no value existed in
the cache");
- }
- }
- } catch (final IOException | ClassNotFoundException ioe) {
- getLogger().warn("Failed to recover persisted state from {}
due to {}. Assuming that state from distributed cache is correct.", new
Object[]{file, ioe});
- }
- }
-
- return scanResult;
- }
-
public static class ScanResult implements Serializable {
private static final long serialVersionUID = 1L;
diff --git
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
index 5245d142d7..6ca69c1cd7 100644
---
a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
+++
b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java
@@ -16,23 +16,18 @@
*/
package org.apache.nifi.hbase;
-import org.apache.nifi.annotation.notification.PrimaryNodeState;
-import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.hbase.GetHBase.ScanResult;
import org.apache.nifi.hbase.scan.Column;
-import org.apache.nifi.hbase.util.StringSerDe;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.io.File;
import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
@@ -47,11 +42,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
public class TestGetHBase {
@@ -74,22 +67,12 @@ public class TestGetHBase {
runner.enableControllerService(hBaseClient);
runner.setProperty(GetHBase.TABLE_NAME, "nifi");
- runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient");
runner.setProperty(GetHBase.HBASE_CLIENT_SERVICE, "hbaseClient");
runner.setProperty(GetHBase.AUTHORIZATIONS, "");
runner.setValidateExpressionUsage(true);
}
- @AfterEach
- public void cleanup() {
- final File file = proc.getStateFile();
- if (file.exists()) {
- file.delete();
- }
- assertFalse(file.exists());
- }
-
@Test
public void testColumnsValidation() {
runner.assertValid();
@@ -139,12 +122,6 @@ public class TestGetHBase {
@Test
public void testPersistAndRecoverFromLocalState() {
- final File stateFile = new File("target/test-recover-state.bin");
- if (!stateFile.delete() && stateFile.exists()) {
- fail("Could not delete state file " + stateFile);
- }
- proc.setStateFile(stateFile);
-
final long now = System.currentTimeMillis();
final Map<String, String> cells = new HashMap<>();
@@ -164,7 +141,7 @@ public class TestGetHBase {
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
runner.clearTransferState();
- proc = new MockGetHBase(stateFile);
+ proc = new MockGetHBase();
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
@@ -195,14 +172,6 @@ public class TestGetHBase {
runner.run();
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
- // delete the processor's local state to simulate becoming the primary
node
- // for the first time, should use the state from distributed cache
- final File stateFile = proc.getStateFile();
- if (!stateFile.delete() && stateFile.exists()) {
- fail("Could not delete state file " + stateFile);
- }
- proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
-
hBaseClient.addResult("row0", cells, now - 2);
hBaseClient.addResult("row1", cells, now - 1);
hBaseClient.addResult("row2", cells, now - 1);
@@ -214,76 +183,6 @@ public class TestGetHBase {
runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
}
- @Test
- public void testBecomePrimaryWithNewerLocalState() throws
InitializationException {
- final long now = System.currentTimeMillis();
-
- final Map<String, String> cells = new HashMap<>();
- cells.put("greeting", "hello");
- cells.put("name", "nifi");
-
- hBaseClient.addResult("row0", cells, now - 2);
- hBaseClient.addResult("row1", cells, now - 1);
- hBaseClient.addResult("row2", cells, now - 1);
- hBaseClient.addResult("row3", cells, now);
-
- runner.run(100);
- runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
-
- // trick for testing so that row4 gets written to local state but not
to the real cache
- final MockCacheClient otherCacheClient = new MockCacheClient();
- runner.addControllerService("otherCacheClient", otherCacheClient);
- runner.enableControllerService(otherCacheClient);
- runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE,
"otherCacheClient");
-
- hBaseClient.addResult("row4", cells, now + 1);
- runner.run();
- runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 5);
-
- // set back the original cache cacheClient which is missing row4
- runner.setProperty(GetHBase.DISTRIBUTED_CACHE_SERVICE, "cacheClient");
-
- // become the primary node, but we have existing local state with rows
0-4
- // so we shouldn't get any output because we should use the local state
- proc.onPrimaryNodeChange(PrimaryNodeState.ELECTED_PRIMARY_NODE);
-
- hBaseClient.addResult("row0", cells, now - 2);
- hBaseClient.addResult("row1", cells, now - 1);
- hBaseClient.addResult("row2", cells, now - 1);
- hBaseClient.addResult("row3", cells, now);
- hBaseClient.addResult("row4", cells, now + 1);
-
- runner.clearTransferState();
- runner.run(100);
- runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 0);
- }
-
- @Test
- public void testOnRemovedClearsState() throws IOException {
- final long now = System.currentTimeMillis();
-
- final Map<String, String> cells = new HashMap<>();
- cells.put("greeting", "hello");
- cells.put("name", "nifi");
-
- hBaseClient.addResult("row0", cells, now - 2);
- hBaseClient.addResult("row1", cells, now - 1);
- hBaseClient.addResult("row2", cells, now - 1);
- hBaseClient.addResult("row3", cells, now);
-
- runner.run(100);
- runner.assertAllFlowFilesTransferred(GetHBase.REL_SUCCESS, 4);
-
- // should have a local state file and a cache entry before removing
- runner.getStateManager().assertStateSet(Scope.CLUSTER);
-
- proc.onRemoved(runner.getProcessContext());
-
- // onRemoved should have cleared both
- assertFalse(proc.getStateFile().exists());
- assertFalse(cacheClient.containsKey(proc.getKey(), new StringSerDe()));
- }
-
@Test
public void testChangeTableNameClearsState() {
final long now = System.currentTimeMillis();
@@ -412,36 +311,11 @@ public class TestGetHBase {
// Mock processor to override the location of the state file
private static class MockGetHBase extends GetHBase {
- private static final String DEFAULT_STATE_FILE_NAME =
"target/TestGetHBase.bin";
-
- private File stateFile;
-
- public MockGetHBase() {
- this(new File(DEFAULT_STATE_FILE_NAME));
- }
-
- public MockGetHBase(final File stateFile) {
- this.stateFile = stateFile;
- }
-
- public void setStateFile(final File stateFile) {
- this.stateFile = stateFile;
- }
-
@Override
protected int getBatchSize() {
return 2;
}
- @Override
- protected File getStateDir() {
- return new File("target");
- }
-
- @Override
- protected File getStateFile() {
- return stateFile;
- }
}
private class MockCacheClient extends AbstractControllerService implements
DistributedMapCacheClient {