This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 15a1134c85 NIFI-11440 Speed up Iceberg Hive Metastore Tests
15a1134c85 is described below

commit 15a1134c85c38e1d6fe146259ca8b2f77c6cdbf5
Author: Mark Bathori <[email protected]>
AuthorDate: Thu Apr 13 11:49:52 2023 +0200

    NIFI-11440 Speed up Iceberg Hive Metastore Tests
    
    This closes #7170
    
    Signed-off-by: David Handermann <[email protected]>
    (cherry picked from commit 44a7f7f38b3950a8a0135d1ce8602a4890c3d995)
---
 .../nifi/hive/metastore/ThriftMetastore.java       | 10 +--
 .../hive/TestTriggerHiveMetaStoreEvent.java        | 15 +++--
 .../iceberg/TestPutIcebergWithHiveCatalog.java     | 71 +++++++++++-----------
 3 files changed, 51 insertions(+), 45 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java
index 5e1425d667..2b3dc4da5c 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java
@@ -19,15 +19,15 @@ package org.apache.nifi.hive.metastore;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 
 import java.util.HashMap;
 import java.util.Map;
 
 /** A JUnit Extension that creates a Hive Metastore Thrift service backed by a 
Hive Metastore using an in-memory Derby database. */
-public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback {
+public class ThriftMetastore implements BeforeAllCallback, AfterAllCallback {
 
     private final MetastoreCore metastoreCore;
 
@@ -43,12 +43,12 @@ public class ThriftMetastore implements BeforeEachCallback, 
AfterEachCallback {
     }
 
     @Override
-    public void beforeEach(ExtensionContext context) throws Exception {
+    public void beforeAll(ExtensionContext context) throws Exception {
         metastoreCore.initialize(configOverrides);
     }
 
     @Override
-    public void afterEach(ExtensionContext context) {
+    public void afterAll(ExtensionContext context) {
         metastoreCore.shutdown();
     }
 
diff --git 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java
 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java
index 0cd52f801c..72e312439f 100644
--- 
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java
+++ 
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java
@@ -37,6 +37,8 @@ import org.apache.nifi.hive.metastore.ThriftMetastore;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -56,6 +58,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.condition.OS.WINDOWS;
 
+@DisabledOnOs(WINDOWS)
 public class TestTriggerHiveMetaStoreEvent {
 
     private TestRunner runner;
@@ -71,7 +74,7 @@ public class TestTriggerHiveMetaStoreEvent {
     );
 
     @RegisterExtension
-    public ThriftMetastore metastore = new ThriftMetastore()
+    public static ThriftMetastore metastore = new ThriftMetastore()
             
.withConfigOverrides(Collections.singletonMap(TRANSACTIONAL_EVENT_LISTENERS.getVarname(),
 "org.apache.hive.hcatalog.listener.DbNotificationListener"));
 
     @BeforeEach
@@ -80,6 +83,11 @@ public class TestTriggerHiveMetaStoreEvent {
         metaStoreClient = metastore.getMetaStoreClient();
     }
 
+    @AfterEach
+    public void tearDown() throws TException {
+        metaStoreClient.dropTable(TEST_DATABASE_NAME, TEST_TABLE_NAME);
+    }
+
     private void initUnPartitionedTable() throws Exception {
         createTable(TEST_DATABASE_NAME, TEST_TABLE_NAME, 
Collections.emptyList(), metastore.getWarehouseLocation() + "/" + 
TEST_TABLE_NAME);
     }
@@ -91,7 +99,6 @@ public class TestTriggerHiveMetaStoreEvent {
         createPartition(table, Lists.newArrayList("2018", "march"));
     }
 
-    @DisabledOnOs(WINDOWS)
     @Test
     public void testInsertOnUnPartitionedTable() throws Exception {
         initUnPartitionedTable();
@@ -126,7 +133,6 @@ public class TestTriggerHiveMetaStoreEvent {
         assertEquals(insertMessage.getTable(), TEST_TABLE_NAME);
     }
 
-    @DisabledOnOs(WINDOWS)
     @Test
     public void testInsertOnPartitionedTable() throws Exception {
         initPartitionedTable();
@@ -162,7 +168,6 @@ public class TestTriggerHiveMetaStoreEvent {
         assertEquals(insertMessage.getTable(), TEST_TABLE_NAME);
     }
 
-    @DisabledOnOs(WINDOWS)
     @Test
     public void testAddPartition() throws Exception {
         initPartitionedTable();
@@ -204,7 +209,6 @@ public class TestTriggerHiveMetaStoreEvent {
         assertDoesNotThrow(() -> 
metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, 
Arrays.asList("2017", "june")));
     }
 
-    @DisabledOnOs(WINDOWS)
     @Test
     public void testDropPartition() throws Exception {
         initPartitionedTable();
@@ -246,7 +250,6 @@ public class TestTriggerHiveMetaStoreEvent {
         assertThrows(NoSuchObjectException.class, () -> 
metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME, 
Arrays.asList("2017", "june")));
     }
 
-    @DisabledOnOs(WINDOWS)
     @Test
     public void testUnknownEventType() throws Exception {
         initUnPartitionedTable();
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
index bd4f959c43..fb5f5ce41a 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -22,7 +22,6 @@ import org.apache.commons.io.IOUtils;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
@@ -38,6 +37,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.condition.DisabledOnOs;
@@ -60,18 +60,23 @@ import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateN
 import static 
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
 import static org.junit.jupiter.api.condition.OS.WINDOWS;
 
+@DisabledOnOs(WINDOWS)
 public class TestPutIcebergWithHiveCatalog {
 
     private TestRunner runner;
     private PutIceberg processor;
     private Schema inputSchema;
+    private TestHiveCatalogService catalogService;
 
     @RegisterExtension
-    public ThriftMetastore metastore = new ThriftMetastore();
+    public static ThriftMetastore metastore = new ThriftMetastore();
 
-    private static final Namespace NAMESPACE = Namespace.of("test_metastore");
+    private static final String CATALOG_NAME = "test_metastore";
+    private static final String TABLE_NAME = "users";
 
-    private static final TableIdentifier TABLE_IDENTIFIER = 
TableIdentifier.of(NAMESPACE, "users");
+    private static final Namespace NAMESPACE = Namespace.of(CATALOG_NAME);
+
+    private static final TableIdentifier TABLE_IDENTIFIER = 
TableIdentifier.of(NAMESPACE, TABLE_NAME);
 
     private static final org.apache.iceberg.Schema USER_SCHEMA = new 
org.apache.iceberg.Schema(
             Types.NestedField.required(1, "id", Types.IntegerType.get()),
@@ -85,6 +90,16 @@ public class TestPutIcebergWithHiveCatalog {
         inputSchema = new Schema.Parser().parse(avroSchema);
 
         processor = new PutIceberg();
+
+        catalogService = new TestHiveCatalogService.Builder()
+                .withMetastoreUri(metastore.getThriftConnectionUri())
+                .withWarehouseLocation(metastore.getWarehouseLocation())
+                .build();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        catalogService.getCatalog().dropTable(TABLE_IDENTIFIER);
     }
 
     private void initRecordReader() throws InitializationException {
@@ -106,28 +121,19 @@ public class TestPutIcebergWithHiveCatalog {
         runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
     }
 
-    private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws 
InitializationException {
-        TestHiveCatalogService catalogService = new 
TestHiveCatalogService.Builder()
-                .withMetastoreUri(metastore.getThriftConnectionUri())
-                .withWarehouseLocation(metastore.getWarehouseLocation())
-                .build();
-        Catalog catalog = catalogService.getCatalog();
-
+    private void initCatalog(PartitionSpec spec, String fileFormat) throws 
InitializationException {
         Map<String, String> tableProperties = new HashMap<>();
         tableProperties.put(TableProperties.FORMAT_VERSION, "2");
         tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
 
-        catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec, 
tableProperties);
+        catalogService.getCatalog().createTable(TABLE_IDENTIFIER, USER_SCHEMA, 
spec, tableProperties);
 
         runner.addControllerService("catalog-service", catalogService);
         runner.enableControllerService(catalogService);
 
         runner.setProperty(PutIceberg.CATALOG, "catalog-service");
-
-        return catalog;
     }
 
-    @DisabledOnOs(WINDOWS)
     @ParameterizedTest
     @ValueSource(strings = {"avro"})
     public void onTriggerPartitioned(String fileFormat) throws Exception {
@@ -137,14 +143,14 @@ public class TestPutIcebergWithHiveCatalog {
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        Catalog catalog = initCatalog(spec, fileFormat);
-        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
-        runner.setProperty(PutIceberg.TABLE_NAME, "users");
+        initCatalog(spec, fileFormat);
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
+        runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
         runner.setValidateExpressionUsage(false);
         runner.enqueue(new byte[0]);
         runner.run();
 
-        Table table = catalog.loadTable(TABLE_IDENTIFIER);
+        Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
 
         List<Record> expectedRecords = 
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
                 .add(0, "John", "Finance")
@@ -165,7 +171,6 @@ public class TestPutIcebergWithHiveCatalog {
                 "department_bucket=0", "department_bucket=1", 
"department_bucket=2"));
     }
 
-    @DisabledOnOs(WINDOWS)
     @ParameterizedTest
     @ValueSource(strings = {"orc"})
     public void onTriggerIdentityPartitioned(String fileFormat) throws 
Exception {
@@ -175,14 +180,14 @@ public class TestPutIcebergWithHiveCatalog {
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        Catalog catalog = initCatalog(spec, fileFormat);
-        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
-        runner.setProperty(PutIceberg.TABLE_NAME, "users");
+        initCatalog(spec, fileFormat);
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
+        runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
         runner.setValidateExpressionUsage(false);
         runner.enqueue(new byte[0]);
         runner.run();
 
-        Table table = catalog.loadTable(TABLE_IDENTIFIER);
+        Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
 
         List<Record> expectedRecords = 
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
                 .add(0, "John", "Finance")
@@ -203,7 +208,6 @@ public class TestPutIcebergWithHiveCatalog {
                 "department=Finance", "department=Marketing", 
"department=Sales"));
     }
 
-    @DisabledOnOs(WINDOWS)
     @ParameterizedTest
     @ValueSource(strings = {"parquet"})
     public void onTriggerMultiLevelIdentityPartitioned(String fileFormat) 
throws Exception {
@@ -214,14 +218,14 @@ public class TestPutIcebergWithHiveCatalog {
 
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        Catalog catalog = initCatalog(spec, fileFormat);
-        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
-        runner.setProperty(PutIceberg.TABLE_NAME, "users");
+        initCatalog(spec, fileFormat);
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
+        runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
         runner.setValidateExpressionUsage(false);
         runner.enqueue(new byte[0]);
         runner.run();
 
-        Table table = catalog.loadTable(TABLE_IDENTIFIER);
+        Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
 
         List<Record> expectedRecords = 
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
                 .add(0, "John", "Finance")
@@ -246,20 +250,19 @@ public class TestPutIcebergWithHiveCatalog {
         ));
     }
 
-    @DisabledOnOs(WINDOWS)
     @ParameterizedTest
     @ValueSource(strings = {"avro"})
     public void onTriggerUnPartitioned(String fileFormat) throws Exception {
         runner = TestRunners.newTestRunner(processor);
         initRecordReader();
-        Catalog catalog = initCatalog(PartitionSpec.unpartitioned(), 
fileFormat);
-        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
-        runner.setProperty(PutIceberg.TABLE_NAME, "users");
+        initCatalog(PartitionSpec.unpartitioned(), fileFormat);
+        runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
+        runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
         runner.setValidateExpressionUsage(false);
         runner.enqueue(new byte[0]);
         runner.run();
 
-        Table table = catalog.loadTable(TABLE_IDENTIFIER);
+        Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
 
         List<Record> expectedRecords = 
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
                 .add(0, "John", "Finance")

Reply via email to