This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 44a7f7f38b NIFI-11440 Speed up Iceberg Hive Metastore Tests
44a7f7f38b is described below
commit 44a7f7f38b3950a8a0135d1ce8602a4890c3d995
Author: Mark Bathori <[email protected]>
AuthorDate: Thu Apr 13 11:49:52 2023 +0200
NIFI-11440 Speed up Iceberg Hive Metastore Tests
This closes #7170
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/hive/metastore/ThriftMetastore.java | 10 +--
.../hive/TestTriggerHiveMetaStoreEvent.java | 15 +++--
.../iceberg/TestPutIcebergWithHiveCatalog.java | 71 +++++++++++-----------
3 files changed, 51 insertions(+), 45 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java
index 5e1425d667..2b3dc4da5c 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-test-utils/src/main/java/org/apache/nifi/hive/metastore/ThriftMetastore.java
@@ -19,15 +19,15 @@ package org.apache.nifi.hive.metastore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.junit.jupiter.api.extension.AfterEachCallback;
-import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import java.util.HashMap;
import java.util.Map;
/** A JUnit Extension that creates a Hive Metastore Thrift service backed by a
Hive Metastore using an in-memory Derby database. */
-public class ThriftMetastore implements BeforeEachCallback, AfterEachCallback {
+public class ThriftMetastore implements BeforeAllCallback, AfterAllCallback {
private final MetastoreCore metastoreCore;
@@ -43,12 +43,12 @@ public class ThriftMetastore implements BeforeEachCallback,
AfterEachCallback {
}
@Override
- public void beforeEach(ExtensionContext context) throws Exception {
+ public void beforeAll(ExtensionContext context) throws Exception {
metastoreCore.initialize(configOverrides);
}
@Override
- public void afterEach(ExtensionContext context) {
+ public void afterAll(ExtensionContext context) {
metastoreCore.shutdown();
}
diff --git
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java
index 0cd52f801c..72e312439f 100644
---
a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java
+++
b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestTriggerHiveMetaStoreEvent.java
@@ -37,6 +37,8 @@ import org.apache.nifi.hive.metastore.ThriftMetastore;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -56,6 +58,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.condition.OS.WINDOWS;
+@DisabledOnOs(WINDOWS)
public class TestTriggerHiveMetaStoreEvent {
private TestRunner runner;
@@ -71,7 +74,7 @@ public class TestTriggerHiveMetaStoreEvent {
);
@RegisterExtension
- public ThriftMetastore metastore = new ThriftMetastore()
+ public static ThriftMetastore metastore = new ThriftMetastore()
.withConfigOverrides(Collections.singletonMap(TRANSACTIONAL_EVENT_LISTENERS.getVarname(),
"org.apache.hive.hcatalog.listener.DbNotificationListener"));
@BeforeEach
@@ -80,6 +83,11 @@ public class TestTriggerHiveMetaStoreEvent {
metaStoreClient = metastore.getMetaStoreClient();
}
+ @AfterEach
+ public void tearDown() throws TException {
+ metaStoreClient.dropTable(TEST_DATABASE_NAME, TEST_TABLE_NAME);
+ }
+
private void initUnPartitionedTable() throws Exception {
createTable(TEST_DATABASE_NAME, TEST_TABLE_NAME,
Collections.emptyList(), metastore.getWarehouseLocation() + "/" +
TEST_TABLE_NAME);
}
@@ -91,7 +99,6 @@ public class TestTriggerHiveMetaStoreEvent {
createPartition(table, Lists.newArrayList("2018", "march"));
}
- @DisabledOnOs(WINDOWS)
@Test
public void testInsertOnUnPartitionedTable() throws Exception {
initUnPartitionedTable();
@@ -126,7 +133,6 @@ public class TestTriggerHiveMetaStoreEvent {
assertEquals(insertMessage.getTable(), TEST_TABLE_NAME);
}
- @DisabledOnOs(WINDOWS)
@Test
public void testInsertOnPartitionedTable() throws Exception {
initPartitionedTable();
@@ -162,7 +168,6 @@ public class TestTriggerHiveMetaStoreEvent {
assertEquals(insertMessage.getTable(), TEST_TABLE_NAME);
}
- @DisabledOnOs(WINDOWS)
@Test
public void testAddPartition() throws Exception {
initPartitionedTable();
@@ -204,7 +209,6 @@ public class TestTriggerHiveMetaStoreEvent {
assertDoesNotThrow(() ->
metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME,
Arrays.asList("2017", "june")));
}
- @DisabledOnOs(WINDOWS)
@Test
public void testDropPartition() throws Exception {
initPartitionedTable();
@@ -246,7 +250,6 @@ public class TestTriggerHiveMetaStoreEvent {
assertThrows(NoSuchObjectException.class, () ->
metaStoreClient.getPartition(TEST_DATABASE_NAME, TEST_TABLE_NAME,
Arrays.asList("2017", "june")));
}
- @DisabledOnOs(WINDOWS)
@Test
public void testUnknownEventType() throws Exception {
initUnPartitionedTable();
diff --git
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
index bd4f959c43..fb5f5ce41a 100644
---
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
+++
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/test/java/org/apache/nifi/processors/iceberg/TestPutIcebergWithHiveCatalog.java
@@ -22,7 +22,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
@@ -38,6 +37,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.condition.DisabledOnOs;
@@ -60,18 +60,23 @@ import static
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validateN
import static
org.apache.nifi.processors.iceberg.util.IcebergTestUtils.validatePartitionFolders;
import static org.junit.jupiter.api.condition.OS.WINDOWS;
+@DisabledOnOs(WINDOWS)
public class TestPutIcebergWithHiveCatalog {
private TestRunner runner;
private PutIceberg processor;
private Schema inputSchema;
+ private TestHiveCatalogService catalogService;
@RegisterExtension
- public ThriftMetastore metastore = new ThriftMetastore();
+ public static ThriftMetastore metastore = new ThriftMetastore();
- private static final Namespace NAMESPACE = Namespace.of("test_metastore");
+ private static final String CATALOG_NAME = "test_metastore";
+ private static final String TABLE_NAME = "users";
- private static final TableIdentifier TABLE_IDENTIFIER =
TableIdentifier.of(NAMESPACE, "users");
+ private static final Namespace NAMESPACE = Namespace.of(CATALOG_NAME);
+
+ private static final TableIdentifier TABLE_IDENTIFIER =
TableIdentifier.of(NAMESPACE, TABLE_NAME);
private static final org.apache.iceberg.Schema USER_SCHEMA = new
org.apache.iceberg.Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
@@ -85,6 +90,16 @@ public class TestPutIcebergWithHiveCatalog {
inputSchema = new Schema.Parser().parse(avroSchema);
processor = new PutIceberg();
+
+ catalogService = new TestHiveCatalogService.Builder()
+ .withMetastoreUri(metastore.getThriftConnectionUri())
+ .withWarehouseLocation(metastore.getWarehouseLocation())
+ .build();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ catalogService.getCatalog().dropTable(TABLE_IDENTIFIER);
}
private void initRecordReader() throws InitializationException {
@@ -106,28 +121,19 @@ public class TestPutIcebergWithHiveCatalog {
runner.setProperty(PutIceberg.RECORD_READER, "mock-reader-factory");
}
- private Catalog initCatalog(PartitionSpec spec, String fileFormat) throws
InitializationException {
- TestHiveCatalogService catalogService = new
TestHiveCatalogService.Builder()
- .withMetastoreUri(metastore.getThriftConnectionUri())
- .withWarehouseLocation(metastore.getWarehouseLocation())
- .build();
- Catalog catalog = catalogService.getCatalog();
-
+ private void initCatalog(PartitionSpec spec, String fileFormat) throws
InitializationException {
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put(TableProperties.FORMAT_VERSION, "2");
tableProperties.put(TableProperties.DEFAULT_FILE_FORMAT, fileFormat);
- catalog.createTable(TABLE_IDENTIFIER, USER_SCHEMA, spec,
tableProperties);
+ catalogService.getCatalog().createTable(TABLE_IDENTIFIER, USER_SCHEMA,
spec, tableProperties);
runner.addControllerService("catalog-service", catalogService);
runner.enableControllerService(catalogService);
runner.setProperty(PutIceberg.CATALOG, "catalog-service");
-
- return catalog;
}
- @DisabledOnOs(WINDOWS)
@ParameterizedTest
@ValueSource(strings = {"avro"})
public void onTriggerPartitioned(String fileFormat) throws Exception {
@@ -137,14 +143,14 @@ public class TestPutIcebergWithHiveCatalog {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
- Catalog catalog = initCatalog(spec, fileFormat);
- runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
- runner.setProperty(PutIceberg.TABLE_NAME, "users");
+ initCatalog(spec, fileFormat);
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
+ runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]);
runner.run();
- Table table = catalog.loadTable(TABLE_IDENTIFIER);
+ Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords =
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance")
@@ -165,7 +171,6 @@ public class TestPutIcebergWithHiveCatalog {
"department_bucket=0", "department_bucket=1",
"department_bucket=2"));
}
- @DisabledOnOs(WINDOWS)
@ParameterizedTest
@ValueSource(strings = {"orc"})
public void onTriggerIdentityPartitioned(String fileFormat) throws
Exception {
@@ -175,14 +180,14 @@ public class TestPutIcebergWithHiveCatalog {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
- Catalog catalog = initCatalog(spec, fileFormat);
- runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
- runner.setProperty(PutIceberg.TABLE_NAME, "users");
+ initCatalog(spec, fileFormat);
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
+ runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]);
runner.run();
- Table table = catalog.loadTable(TABLE_IDENTIFIER);
+ Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords =
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance")
@@ -203,7 +208,6 @@ public class TestPutIcebergWithHiveCatalog {
"department=Finance", "department=Marketing",
"department=Sales"));
}
- @DisabledOnOs(WINDOWS)
@ParameterizedTest
@ValueSource(strings = {"parquet"})
public void onTriggerMultiLevelIdentityPartitioned(String fileFormat)
throws Exception {
@@ -214,14 +218,14 @@ public class TestPutIcebergWithHiveCatalog {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
- Catalog catalog = initCatalog(spec, fileFormat);
- runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
- runner.setProperty(PutIceberg.TABLE_NAME, "users");
+ initCatalog(spec, fileFormat);
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
+ runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]);
runner.run();
- Table table = catalog.loadTable(TABLE_IDENTIFIER);
+ Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords =
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance")
@@ -246,20 +250,19 @@ public class TestPutIcebergWithHiveCatalog {
));
}
- @DisabledOnOs(WINDOWS)
@ParameterizedTest
@ValueSource(strings = {"avro"})
public void onTriggerUnPartitioned(String fileFormat) throws Exception {
runner = TestRunners.newTestRunner(processor);
initRecordReader();
- Catalog catalog = initCatalog(PartitionSpec.unpartitioned(),
fileFormat);
- runner.setProperty(PutIceberg.CATALOG_NAMESPACE, "test_metastore");
- runner.setProperty(PutIceberg.TABLE_NAME, "users");
+ initCatalog(PartitionSpec.unpartitioned(), fileFormat);
+ runner.setProperty(PutIceberg.CATALOG_NAMESPACE, CATALOG_NAME);
+ runner.setProperty(PutIceberg.TABLE_NAME, TABLE_NAME);
runner.setValidateExpressionUsage(false);
runner.enqueue(new byte[0]);
runner.run();
- Table table = catalog.loadTable(TABLE_IDENTIFIER);
+ Table table = catalogService.getCatalog().loadTable(TABLE_IDENTIFIER);
List<Record> expectedRecords =
IcebergTestUtils.RecordsBuilder.newInstance(USER_SCHEMA)
.add(0, "John", "Finance")