This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit b8b508c1a3e1049281111856f5e2d3e116736fd8
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Apr 10 21:55:41 2023 +0300

    NIFI-11427 Upgraded Atlas from 2.2.0 to 2.3.0
    
    This closes #7158
    
    Signed-off-by: David Handermann <[email protected]>
    (cherry picked from commit cbca499070404914dc3e31b14262936f26ba9a41)
---
 .../org/apache/nifi/atlas/hook/NiFiAtlasHook.java  |   6 +
 .../atlas/emulator/AtlasAPIV2ServerEmulator.java   |   5 -
 .../apache/nifi/atlas/emulator/EmbeddedKafka.java  | 173 ---------------------
 .../provenance/analyzer/TestAwsS3DirectoryV2.java  |   2 +-
 nifi-nar-bundles/nifi-atlas-bundle/pom.xml         |   2 +-
 5 files changed, 8 insertions(+), 180 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
index 3f778e36ec..a7e526fe9f 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/hook/NiFiAtlasHook.java
@@ -31,6 +31,7 @@ import java.util.List;
 public class NiFiAtlasHook extends AtlasHook implements LineageContext {
 
     public static final String NIFI_USER = "nifi";
+    public static final String NIFI_SOURCE = "nifi";
 
     private NiFiAtlasClient atlasClient;
 
@@ -45,6 +46,11 @@ public class NiFiAtlasHook extends AtlasHook implements 
LineageContext {
         messages.add(message);
     }
 
+    @Override
+    public String getMessageSource() {
+        return NIFI_SOURCE;
+    }
+
     public void commitMessages() {
         final NotificationSender notificationSender = 
createNotificationSender();
         notificationSender.setAtlasClient(atlasClient);
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java
index de2df80fa2..a7771c73ad 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/AtlasAPIV2ServerEmulator.java
@@ -80,7 +80,6 @@ public class AtlasAPIV2ServerEmulator {
     private Server server;
     private ServerConnector httpConnector;
     private AtlasNotificationServerEmulator notificationServerEmulator;
-    private EmbeddedKafka embeddedKafka;
 
     public static void main(String[] args) throws Exception {
         final AtlasAPIV2ServerEmulator emulator = new 
AtlasAPIV2ServerEmulator();
@@ -95,9 +94,6 @@ public class AtlasAPIV2ServerEmulator {
         server.start();
         logger.info("Starting {} on port {}", 
AtlasAPIV2ServerEmulator.class.getSimpleName(), httpConnector.getLocalPort());
 
-        embeddedKafka = new EmbeddedKafka();
-        embeddedKafka.start();
-
         notificationServerEmulator.consume(m -> {
             if (m instanceof HookNotificationV1.EntityCreateRequest) {
                 HookNotificationV1.EntityCreateRequest em = 
(HookNotificationV1.EntityCreateRequest) m;
@@ -225,7 +221,6 @@ public class AtlasAPIV2ServerEmulator {
 
     public void stop() throws Exception {
         notificationServerEmulator.stop();
-        embeddedKafka.stop();
         server.stop();
     }
 
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java
deleted file mode 100644
index 4ace2be0d4..0000000000
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/emulator/EmbeddedKafka.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.atlas.emulator;
-
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Properties;
-
-/**
- * Embedded Kafka server, primarily to be used for testing.
- */
-public class EmbeddedKafka {
-
-    private final KafkaServerStartable kafkaServer;
-
-    private final Properties zookeeperConfig;
-
-    private final Properties kafkaConfig;
-
-    private final ZooKeeperServer zkServer;
-
-    private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
-
-    private boolean started;
-
-    /**
-     * Will create instance of the embedded Kafka server. Kafka and Zookeeper
-     * configuration properties will be loaded from 'server.properties' and
-     * 'zookeeper.properties' located at the root of the classpath.
-     */
-    public EmbeddedKafka() {
-        this(loadPropertiesFromClasspath("/server.properties"), 
loadPropertiesFromClasspath("/zookeeper.properties"));
-    }
-
-    /**
-     * Will create instance of the embedded Kafka server.
-     *
-     * @param kafkaConfig
-     *            Kafka configuration properties
-     * @param zookeeperConfig
-     *            Zookeeper configuration properties
-     */
-    public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
-        this.cleanupKafkaWorkDir();
-
-        this.kafkaConfig = kafkaConfig;
-        this.zookeeperConfig = zookeeperConfig;
-
-        this.kafkaServer = new KafkaServerStartable(new 
KafkaConfig(kafkaConfig));
-        this.zkServer = new ZooKeeperServer();
-    }
-
-    /**
-     * Will start embedded Kafka server. Its data directories will be created
-     * at 'kafka-tmp' directory relative to the working directory of the 
current
-     * runtime. The data directories will be deleted upon JVM exit.
-     *
-     */
-    public void start() {
-        if (!this.started) {
-            logger.info("Starting Zookeeper server");
-            this.startZookeeper();
-
-            logger.info("Starting Kafka server");
-            this.kafkaServer.startup();
-
-            logger.info("Embedded Kafka is started at localhost:" + 
this.kafkaServer.staticServerConfig().port()
-                    + ". Zookeeper connection string: " + 
this.kafkaConfig.getProperty("zookeeper.connect"));
-            this.started = true;
-        }
-    }
-
-    /**
-     * Will stop embedded Kafka server, cleaning up all working directories.
-     */
-    public void stop() {
-        if (this.started) {
-            logger.info("Shutting down Kafka server");
-            this.kafkaServer.shutdown();
-            this.kafkaServer.awaitShutdown();
-            logger.info("Shutting down Zookeeper server");
-            this.shutdownZookeeper();
-            logger.info("Embedded Kafka is shut down.");
-            this.cleanupKafkaWorkDir();
-            this.started = false;
-        }
-    }
-
-    /**
-     *
-     */
-    private void cleanupKafkaWorkDir() {
-        File kafkaTmp = new File("target/kafka-tmp");
-        try {
-            FileUtils.deleteDirectory(kafkaTmp);
-        } catch (Exception e) {
-            logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath());
-        }
-    }
-
-    /**
-     * Will start Zookeeper server via {@link ServerCnxnFactory}
-     */
-    private void startZookeeper() {
-        QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
-        try {
-            quorumConfiguration.parseProperties(this.zookeeperConfig);
-
-            ServerConfig configuration = new ServerConfig();
-            configuration.readFrom(quorumConfiguration);
-
-            FileTxnSnapLog txnLog = new 
FileTxnSnapLog(configuration.getDataLogDir(), configuration.getDataDir());
-
-            zkServer.setTxnLogFactory(txnLog);
-            zkServer.setTickTime(configuration.getTickTime());
-            
zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout());
-            
zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout());
-            ServerCnxnFactory zookeeperConnectionFactory = 
ServerCnxnFactory.createFactory();
-            
zookeeperConnectionFactory.configure(configuration.getClientPortAddress(),
-                    configuration.getMaxClientCnxns());
-            zookeeperConnectionFactory.startup(zkServer);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to start Zookeeper 
server", e);
-        }
-    }
-
-    /**
-     * Will shut down Zookeeper server.
-     */
-    private void shutdownZookeeper() {
-        zkServer.shutdown();
-    }
-
-    /**
-     * Will load {@link Properties} from properties file discovered at the
-     * provided path relative to the root of the classpath.
-     */
-    private static Properties loadPropertiesFromClasspath(String path) {
-        try {
-            Properties kafkaProperties = new Properties();
-            kafkaProperties.load(Class.class.getResourceAsStream(path));
-            return kafkaProperties;
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-}
diff --git 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java
 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java
index 77cfae74d8..537d71e761 100644
--- 
a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java
+++ 
b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/test/java/org/apache/nifi/atlas/provenance/analyzer/TestAwsS3DirectoryV2.java
@@ -82,7 +82,7 @@ public class TestAwsS3DirectoryV2 extends 
AbstractTestAwsS3Directory {
             assertEquals(AwsS3Directory.TYPE_DIRECTORY_V2, ref.getTypeName());
             assertEquals(String.format("s3a://%s%s/@%s", AWS_BUCKET, 
actualPath, ATLAS_NAMESPACE), ref.get(ATTR_QUALIFIED_NAME));
             assertEquals(directory, ref.get(ATTR_NAME));
-            assertEquals(StringUtils.substringBeforeLast(actualPath, "/") + 
"/", ref.get(ATTR_OBJECT_PREFIX_V2));
+            assertEquals(actualPath + "/", ref.get(ATTR_OBJECT_PREFIX_V2));
             assertNotNull(ref.get(ATTR_CONTAINER_V2));
 
             ref = (Referenceable) ref.get(ATTR_CONTAINER_V2);
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/pom.xml 
b/nifi-nar-bundles/nifi-atlas-bundle/pom.xml
index e74e91b4e3..59860d3163 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-atlas-bundle/pom.xml
@@ -26,7 +26,7 @@
     <packaging>pom</packaging>
 
     <properties>
-        <atlas.version>2.2.0</atlas.version>
+        <atlas.version>2.3.0</atlas.version>
     </properties>
 
     <modules>

Reply via email to