This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
commit b8b508c1a3e1049281111856f5e2d3e116736fd8 Author: Pierre Villard <[email protected]> AuthorDate: Mon Apr 10 21:55:41 2023 +0300 NIFI-11427 Upgraded Atlas from 2.2.0 to 2.3.0 This closes #7158 Signed-off-by: David Handermann <[email protected]> (cherry picked from commit cbca499070404914dc3e31b14262936f26ba9a41) --- .../org/apache/nifi/atlas/hook/NiFiAtlasHook.java | 6 + .../atlas/emulator/AtlasAPIV2ServerEmulator.java | 5 - .../apache/nifi/atlas/emulator/EmbeddedKafka.java | 173 --------------------- .../provenance/analyzer/TestAwsS3DirectoryV2.java | 2 +- nifi-nar-bundles/nifi-atlas-bundle/pom.xml | 2 +- 5 files changed, 8 insertions(+), 180 deletions(-) diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java index 3f778e36ec..a7e526fe9f 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java @@ -31,6 +31,7 @@ import java.util.List; public class NiFiAtlasHook extends AtlasHook implements LineageContext { public static final String NIFI_USER = "nifi"; + public static final String NIFI_SOURCE = "nifi"; private NiFiAtlasClient atlasClient; @@ -45,6 +46,11 @@ public class NiFiAtlasHook extends AtlasHook implements LineageContext { messages.add(message); } + @Override + public String getMessageSource() { + return NIFI_SOURCE; + } + public void commitMessages() { final NotificationSender notificationSender = createNotificationSender(); notificationSender.setAtlasClient(atlasClient); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java index de2df80fa2..a7771c73ad 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java @@ -80,7 +80,6 @@ public class AtlasAPIV2ServerEmulator { private Server server; private ServerConnector httpConnector; private AtlasNotificationServerEmulator notificationServerEmulator; - private EmbeddedKafka embeddedKafka; public static void main(String[] args) throws Exception { final AtlasAPIV2ServerEmulator emulator = new AtlasAPIV2ServerEmulator(); @@ -95,9 +94,6 @@ public class AtlasAPIV2ServerEmulator { server.start(); logger.info("Starting {} on port {}", AtlasAPIV2ServerEmulator.class.getSimpleName(), httpConnector.getLocalPort()); - embeddedKafka = new EmbeddedKafka(); - embeddedKafka.start(); - notificationServerEmulator.consume(m -> { if (m instanceof HookNotificationV1.EntityCreateRequest) { HookNotificationV1.EntityCreateRequest em = (HookNotificationV1.EntityCreateRequest) m; @@ -225,7 +221,6 @@ public class AtlasAPIV2ServerEmulator { public void stop() throws Exception { notificationServerEmulator.stop(); - embeddedKafka.stop(); server.stop(); } diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java deleted file mode 100644 index 4ace2be0d4..0000000000 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.atlas.emulator; - -import kafka.server.KafkaConfig; -import kafka.server.KafkaServerStartable; -import org.apache.commons.io.FileUtils; -import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.apache.zookeeper.server.quorum.QuorumPeerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.util.Properties; - -/** - * Embedded Kafka server, primarily to be used for testing. - */ -public class EmbeddedKafka { - - private final KafkaServerStartable kafkaServer; - - private final Properties zookeeperConfig; - - private final Properties kafkaConfig; - - private final ZooKeeperServer zkServer; - - private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class); - - private boolean started; - - /** - * Will create instance of the embedded Kafka server. Kafka and Zookeeper - * configuration properties will be loaded from 'server.properties' and - * 'zookeeper.properties' located at the root of the classpath. - */ - public EmbeddedKafka() { - this(loadPropertiesFromClasspath("/server.properties"), loadPropertiesFromClasspath("/zookeeper.properties")); - } - - /** - * Will create instance of the embedded Kafka server. - * - * @param kafkaConfig - * Kafka configuration properties - * @param zookeeperConfig - * Zookeeper configuration properties - */ - public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) { - this.cleanupKafkaWorkDir(); - - this.kafkaConfig = kafkaConfig; - this.zookeeperConfig = zookeeperConfig; - - this.kafkaServer = new KafkaServerStartable(new KafkaConfig(kafkaConfig)); - this.zkServer = new ZooKeeperServer(); - } - - /** - * Will start embedded Kafka server. Its data directories will be created - * at 'kafka-tmp' directory relative to the working directory of the current - * runtime. The data directories will be deleted upon JVM exit. - * - */ - public void start() { - if (!this.started) { - logger.info("Starting Zookeeper server"); - this.startZookeeper(); - - logger.info("Starting Kafka server"); - this.kafkaServer.startup(); - - logger.info("Embedded Kafka is started at localhost:" + this.kafkaServer.staticServerConfig().port() - + ". Zookeeper connection string: " + this.kafkaConfig.getProperty("zookeeper.connect")); - this.started = true; - } - } - - /** - * Will stop embedded Kafka server, cleaning up all working directories. - */ - public void stop() { - if (this.started) { - logger.info("Shutting down Kafka server"); - this.kafkaServer.shutdown(); - this.kafkaServer.awaitShutdown(); - logger.info("Shutting down Zookeeper server"); - this.shutdownZookeeper(); - logger.info("Embedded Kafka is shut down."); - this.cleanupKafkaWorkDir(); - this.started = false; - } - } - - /** - * - */ - private void cleanupKafkaWorkDir() { - File kafkaTmp = new File("target/kafka-tmp"); - try { - FileUtils.deleteDirectory(kafkaTmp); - } catch (Exception e) { - logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath()); - } - } - - /** - * Will start Zookeeper server via {@link ServerCnxnFactory} - */ - private void startZookeeper() { - QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); - try { - quorumConfiguration.parseProperties(this.zookeeperConfig); - - ServerConfig configuration = new ServerConfig(); - configuration.readFrom(quorumConfiguration); - - FileTxnSnapLog txnLog = new FileTxnSnapLog(configuration.getDataLogDir(), configuration.getDataDir()); - - zkServer.setTxnLogFactory(txnLog); - zkServer.setTickTime(configuration.getTickTime()); - zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout()); - zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout()); - ServerCnxnFactory zookeeperConnectionFactory = ServerCnxnFactory.createFactory(); - zookeeperConnectionFactory.configure(configuration.getClientPortAddress(), - configuration.getMaxClientCnxns()); - zookeeperConnectionFactory.startup(zkServer); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - throw new IllegalStateException("Failed to start Zookeeper server", e); - } - } - - /** - * Will shut down Zookeeper server. - */ - private void shutdownZookeeper() { - zkServer.shutdown(); - } - - /** - * Will load {@link Properties} from properties file discovered at the - * provided path relative to the root of the classpath. - */ - private static Properties loadPropertiesFromClasspath(String path) { - try { - Properties kafkaProperties = new Properties(); - kafkaProperties.load(Class.class.getResourceAsStream(path)); - return kafkaProperties; - } catch (Exception e) { - throw new IllegalStateException(e); - } - } -} diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java index 77cfae74d8..537d71e761 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java +++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java @@ -82,7 +82,7 @@ public class TestAwsS3DirectoryV2 extends AbstractTestAwsS3Directory { assertEquals(AwsS3Directory.TYPE_DIRECTORY_V2, ref.getTypeName()); assertEquals(String.format("s3a://%s%s/@%s", AWS_BUCKET, actualPath, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME)); assertEquals(directory, ref.get(ATTR_NAME)); - assertEquals(StringUtils.substringBeforeLast(actualPath, "/") + "/", ref.get(ATTR_OBJECT_PREFIX_V2)); + assertEquals(actualPath + "/", ref.get(ATTR_OBJECT_PREFIX_V2)); assertNotNull(ref.get(ATTR_CONTAINER_V2)); ref = (Referenceable) ref.get(ATTR_CONTAINER_V2); diff --git a/nifi-nar-bundles/nifi-atlas-bundle/pom.xml b/nifi-nar-bundles/nifi-atlas-bundle/pom.xml index e74e91b4e3..59860d3163 100644 --- a/nifi-nar-bundles/nifi-atlas-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-atlas-bundle/pom.xml @@ -26,7 +26,7 @@ <packaging>pom</packaging> <properties> - <atlas.version>2.2.0</atlas.version> + <atlas.version>2.3.0</atlas.version> </properties> <modules>
