This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e32a5b1 Add ControllerPeriodicTask integration tests (#4130)
e32a5b1 is described below
commit e32a5b1634f221e31bd8dc49a2284b27a1bb4221
Author: Neha Pawar <[email protected]>
AuthorDate: Wed Apr 17 12:56:07 2019 -0700
Add ControllerPeriodicTask integration tests (#4130)
Refactor SegmentStatusCheckerIntegrationTest to be able to handle all
daemons in same test
Add integration test for RealtimeSegmentRelocator periodic task
---
.../apache/pinot/controller/ControllerConf.java | 12 +
.../core/relocation/RealtimeSegmentRelocator.java | 2 +-
.../tests/BaseClusterIntegrationTest.java | 15 +
.../pinot/integration/tests/ClusterTest.java | 10 +
.../ControllerPeriodicTasksIntegrationTests.java | 422 +++++++++++++++++++++
.../tests/HybridClusterIntegrationTest.java | 10 +-
.../tests/RealtimeClusterIntegrationTest.java | 12 +-
.../tasks/SegmentStatusCheckerIntegrationTest.java | 288 --------------
8 files changed, 463 insertions(+), 308 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 1faa949..2372ac0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -96,6 +96,8 @@ public class ControllerConf extends PropertiesConfiguration {
"controller.retentionManager.initialDelayInSeconds";
private static final String
OFFLINE_SEGMENT_INTERVAL_CHECKER_INITIAL_DELAY_IN_SECONDS =
"controller.offlineSegmentIntervalChecker.initialDelayInSeconds";
+ private static final String
REALTIME_SEGMENT_RELOCATION_INITIAL_DELAY_IN_SECONDS =
+ "controller.realtimeSegmentRelocation.initialDelayInSeconds";
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
@@ -593,6 +595,11 @@ public class ControllerConf extends
PropertiesConfiguration {
ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
}
+ public long getRealtimeSegmentRelocationInitialDelayInSeconds() {
+ return
getLong(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATION_INITIAL_DELAY_IN_SECONDS,
+ ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
+ }
+
public long getOfflineSegmentIntervalCheckerInitialDelayInSeconds() {
return
getLong(ControllerPeriodicTasksConf.OFFLINE_SEGMENT_INTERVAL_CHECKER_INITIAL_DELAY_IN_SECONDS,
ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
@@ -602,6 +609,11 @@ public class ControllerConf extends
PropertiesConfiguration {
setProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS,
initialDelayInSeconds);
}
+ public void setRealtimeSegmentRelocationInitialDelayInSeconds(long
initialDelayInSeconds) {
+
setProperty(ControllerPeriodicTasksConf.REALTIME_SEGMENT_RELOCATION_INITIAL_DELAY_IN_SECONDS,
+ initialDelayInSeconds);
+ }
+
public long getPeriodicTaskInitialDelayInSeconds() {
return ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds();
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index 0a9e352..ea5b05b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -58,7 +58,7 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask<Void> {
public RealtimeSegmentRelocator(PinotHelixResourceManager
pinotHelixResourceManager, ControllerConf config,
ControllerMetrics controllerMetrics) {
super("RealtimeSegmentRelocator",
getRunFrequencySeconds(config.getRealtimeSegmentRelocatorFrequency()),
- config.getPeriodicTaskInitialDelayInSeconds(),
pinotHelixResourceManager, controllerMetrics);
+ config.getRealtimeSegmentRelocationInitialDelayInSeconds(),
pinotHelixResourceManager, controllerMetrics);
}
@Override
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 36e67ab..fed0b68 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.concurrent.Executor;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import kafka.server.KafkaServerStartable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.common.config.TableTaskConfig;
@@ -68,6 +69,7 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
protected final File _avroDir = new File(_tempDir, "avroDir");
protected final File _segmentDir = new File(_tempDir, "segmentDir");
protected final File _tarDir = new File(_tempDir, "tarDir");
+ protected List<KafkaServerStartable> _kafkaStarters;
private org.apache.pinot.client.Connection _pinotConnection;
private Connection _h2Connection;
@@ -311,6 +313,19 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
});
}
+ protected void startKafka() {
+ _kafkaStarters = KafkaStarterUtils
+ .startServers(getNumKafkaBrokers(),
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
+ KafkaStarterUtils.getDefaultKafkaConfiguration());
+ KafkaStarterUtils.createTopic(getKafkaTopic(),
KafkaStarterUtils.DEFAULT_ZK_STR, getNumKafkaPartitions());
+ }
+
+ protected void stopKafka() {
+ for (KafkaServerStartable kafkaStarter : _kafkaStarters) {
+ KafkaStarterUtils.stopServer(kafkaStarter);
+ }
+ }
+
/**
* Get current result for "SELECT COUNT(*)".
*
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 5d9ba61..d127779 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -50,6 +50,7 @@ import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.config.TableTaskConfig;
+import org.apache.pinot.common.config.TenantConfig;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.CommonConstants.Broker;
import org.apache.pinot.common.utils.CommonConstants.Helix;
@@ -463,6 +464,15 @@ public abstract class ClusterTest extends ControllerTest {
_realtimeTableConfig.toJsonConfigString());
}
+ protected void updateRealtimeTableTenant(String tableName, TenantConfig
tenantConfig)
+ throws Exception {
+
+ _realtimeTableConfig.setTenantConfig(tenantConfig);
+
+
sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tableName),
+ _realtimeTableConfig.toJsonConfigString());
+ }
+
protected void dropRealtimeTable(String tableName)
throws Exception {
sendDeleteRequest(
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
new file mode 100644
index 0000000..9535cf6
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTests.java
@@ -0,0 +1,422 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import kafka.server.KafkaServerStartable;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.config.TagOverrideConfig;
+import org.apache.pinot.common.config.TenantConfig;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMeter;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.KafkaStarterUtils;
+import org.apache.pinot.common.utils.helix.HelixHelper;
+import org.apache.pinot.common.utils.retry.RetryPolicies;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.ITestContext;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterGroups;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeGroups;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration tests for all {@link
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask}
+ * The intention of these tests is not to test functionality of daemons,
+ * but simply to check that they run as expected and process the tables when
the controller starts.
+ *
+ * Cluster setup/teardown is common across all tests in the @BeforeClass
method {@link ControllerPeriodicTasksIntegrationTests::setup}.
+ * This includes:
+ * zk, controller, 1 broker, 3 offline servers, 3 realtime servers, kafka with
avro loaded, offline table with segments from avro
+ *
+ * There will be a separate beforeTask(), testTask() and afterTask() for each
ControllerPeriodicTask test, grouped by task name.
+ * See group = "segmentStatusChecker" for example.
+ * The tables needed for the test will be created in beforeTask(), and dropped
in afterTask()
+ *
+ * The groups run sequentially in the order: segmentStatusChecker ->
realtimeSegmentRelocation -> ....
+ */
+public class ControllerPeriodicTasksIntegrationTests extends
BaseClusterIntegrationTestSet {
+
+ private static final String TENANT_NAME = "TestTenant";
+ private static final String DEFAULT_TABLE_NAME = "mytable";
+
+ private static final int PERIODIC_TASK_INITIAL_DELAY_SECONDS = 60;
+ private static final int PERIODIC_TASK_FREQ_SECONDS = 5;
+ private static final String PERIODIC_TASK_FREQ = "5s";
+
+ private String _currentTableName;
+ private List<File> _avroFiles;
+
+ /**
+ * Setup the cluster for the tests
+ * @throws Exception
+ */
+ @BeforeClass
+ public void setUp() throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ startZk();
+ startKafka();
+
+ // Set initial delay of 60 seconds for periodic tasks, to allow time for
tables setup.
+ // Run at 5 seconds freq in order to keep them running, in case first run
happens before table setup
+ ControllerConf controllerConf = getDefaultControllerConfiguration();
+ controllerConf.setTenantIsolationEnabled(false);
+
controllerConf.setStatusCheckerInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
+
controllerConf.setStatusCheckerFrequencyInSeconds(PERIODIC_TASK_FREQ_SECONDS);
+
controllerConf.setRealtimeSegmentRelocationInitialDelayInSeconds(PERIODIC_TASK_INITIAL_DELAY_SECONDS);
+ controllerConf.setRealtimeSegmentRelocatorFrequency(PERIODIC_TASK_FREQ);
+
+ startController(controllerConf);
+ startBroker();
+ startServers(6);
+
+ // Create tenants
+ createBrokerTenant(TENANT_NAME, 1);
+ createServerTenant(TENANT_NAME, 3, 3);
+
+ // unpack avro into _tempDir
+ _avroFiles = unpackAvroData(_tempDir);
+
+ // setup a default offline table, shared across all tests. Each test can
create additional tables and destroy them
+ setupOfflineTableAndSegments(DEFAULT_TABLE_NAME, _avroFiles);
+
+ // push avro into kafka, each test can create the realtime table and
destroy it
+ ExecutorService executor = Executors.newCachedThreadPool();
+ pushAvroIntoKafka(_avroFiles, getKafkaTopic(), executor);
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.MINUTES);
+ }
+
+ /**
+ * Setup offline table, but no segments
+ */
+ private void setupOfflineTable(String table) throws Exception {
+ _realtimeTableConfig = null;
+ addOfflineTable(table, null, null, TENANT_NAME, TENANT_NAME, null,
SegmentVersion.v1, null, null, null);
+ completeTableConfiguration();
+ }
+
+ /**
+ * Setup offline table, with segments from avro
+ */
+ private void setupOfflineTableAndSegments(String table, List<File>
avroFiles) throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
+ setTableName(table);
+ _realtimeTableConfig = null;
+
+ File schemaFile = getSchemaFile();
+ Schema schema = Schema.fromFile(schemaFile);
+ String schemaName = schema.getSchemaName();
+ addSchema(schemaFile, schemaName);
+
+ String timeColumnName = schema.getTimeColumnName();
+ Assert.assertNotNull(timeColumnName);
+ TimeUnit outgoingTimeUnit = schema.getOutgoingTimeUnit();
+ Assert.assertNotNull(outgoingTimeUnit);
+ String timeType = outgoingTimeUnit.toString();
+
+ addOfflineTable(table, timeColumnName, timeType, TENANT_NAME, TENANT_NAME,
null, SegmentVersion.v1, null, null, null);
+ completeTableConfiguration();
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 0,
_segmentDir, _tarDir, table, false,
+ null, null, null, executor);
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.MINUTES);
+ uploadSegments(_tarDir);
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ /**
+ * Setup realtime table for given tablename and topic
+ */
+ private void setupRealtimeTable(String table, String topic, File avroFile)
throws Exception {
+ _offlineTableConfig = null;
+ File schemaFile = getSchemaFile();
+ Schema schema = Schema.fromFile(schemaFile);
+ String schemaName = schema.getSchemaName();
+ addSchema(schemaFile, schemaName);
+
+ String timeColumnName = schema.getTimeColumnName();
+ Assert.assertNotNull(timeColumnName);
+ TimeUnit outgoingTimeUnit = schema.getOutgoingTimeUnit();
+ Assert.assertNotNull(outgoingTimeUnit);
+ String timeType = outgoingTimeUnit.toString();
+
+ addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER,
KafkaStarterUtils.DEFAULT_ZK_STR, topic,
+ getRealtimeSegmentFlushSize(), avroFile, timeColumnName, timeType,
schemaName, TENANT_NAME, TENANT_NAME,
+ getLoadMode(), getSortedColumn(), getInvertedIndexColumns(),
getBloomFilterIndexColumns(), getRawIndexColumns(),
+ getTaskConfig(), getStreamConsumerFactoryClassName());
+ completeTableConfiguration();
+ }
+
+ @Override
+ public String getTableName() {
+ return _currentTableName;
+ }
+
+ private void setTableName(String tableName) {
+ _currentTableName = tableName;
+ }
+
+ /**
+ * Group - segmentStatusChecker - Integration tests for {@link
org.apache.pinot.controller.helix.SegmentStatusChecker}
+ * @throws Exception
+ */
+ @BeforeGroups(groups = "segmentStatusChecker")
+ public void beforeTestSegmentStatusCheckerTest(ITestContext context) throws
Exception {
+ String emptyTable = "table1_OFFLINE";
+ String disabledOfflineTable = "table2_OFFLINE";
+ String basicOfflineTable = getDefaultOfflineTableName();
+ String errorOfflineTable = "table4_OFFLINE";
+ String basicRealtimeTable = getDefaultRealtimeTableName();
+ int numTables = 5;
+
+ context.setAttribute("emptyTable", emptyTable);
+ context.setAttribute("disabledOfflineTable", disabledOfflineTable);
+ context.setAttribute("basicOfflineTable", basicOfflineTable);
+ context.setAttribute("errorOfflineTable", errorOfflineTable);
+ context.setAttribute("basicRealtimeTable", basicRealtimeTable);
+ context.setAttribute("numTables", numTables);
+
+ // empty table
+ setupOfflineTable(emptyTable);
+
+ // table with disabled ideal state
+ setupOfflineTable(disabledOfflineTable);
+ _helixAdmin.enableResource(_clusterName, disabledOfflineTable, false);
+
+ // some segments offline
+ setupOfflineTableAndSegments(errorOfflineTable, _avroFiles);
+ HelixHelper.updateIdealState(_helixManager, errorOfflineTable, new
Function<IdealState, IdealState>() {
+ @Nullable
+ @Override
+ public IdealState apply(@Nullable IdealState input) {
+ List<String> segmentNames =
Lists.newArrayList(input.getPartitionSet());
+ Collections.sort(segmentNames);
+
+ Map<String, String> instanceStateMap1 =
input.getInstanceStateMap(segmentNames.get(0));
+ for (String instance : instanceStateMap1.keySet()) {
+ instanceStateMap1.put(instance, "OFFLINE");
+ break;
+ }
+ return input;
+ }
+ }, RetryPolicies.fixedDelayRetryPolicy(2, 10));
+
+ // setup default realtime table
+ setupRealtimeTable(basicRealtimeTable, getKafkaTopic(), _avroFiles.get(0));
+ }
+
+ /**
+ * After 1 run of SegmentStatusChecker the controllerMetrics will be set for
each table
+ * Validate that we are seeing the expected numbers
+ */
+ @Test(groups = "segmentStatusChecker")
+ public void testSegmentStatusChecker(ITestContext context) throws Exception {
+ String emptyTable = (String) context.getAttribute("emptyTable");
+ String disabledOfflineTable = (String)
context.getAttribute("disabledOfflineTable");
+ String basicOfflineTable = (String)
context.getAttribute("basicOfflineTable");
+ String errorOfflineTable = (String)
context.getAttribute("errorOfflineTable");
+ String basicRealtimeTable = (String)
context.getAttribute("basicRealtimeTable");
+ int numTables = (int) context.getAttribute("numTables");
+
+ ControllerMetrics controllerMetrics =
_controllerStarter.getControllerMetrics();
+
+ TestUtils.waitForCondition(input ->
+
controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
+ "SegmentStatusChecker") >= numTables, 240_000, "Timed out waiting
for SegmentStatusChecker");
+
+ // empty table - table1_OFFLINE
+ // num replicas set from ideal state
+ checkSegmentStatusCheckerMetrics(controllerMetrics, emptyTable, null, 3,
100, 0, 100);
+
+ // disabled table - table2_OFFLINE
+ // reset to defaults
+ checkSegmentStatusCheckerMetrics(controllerMetrics, disabledOfflineTable,
null, Long.MIN_VALUE, Long.MIN_VALUE,
+ Long.MIN_VALUE, Long.MIN_VALUE);
+
+ // happy path table - mytable_OFFLINE
+ IdealState idealState =
_helixResourceManager.getTableIdealState(basicOfflineTable);
+ checkSegmentStatusCheckerMetrics(controllerMetrics, basicOfflineTable,
idealState, 3, 100, 0, 100);
+
+ // offline segments - table4_OFFLINE
+ // 2 replicas available out of 3, percent 66
+ idealState = _helixResourceManager.getTableIdealState(errorOfflineTable);
+ checkSegmentStatusCheckerMetrics(controllerMetrics, errorOfflineTable,
idealState, 2, 66, 0, 100);
+
+ // happy path table - mytable_REALTIME
+ idealState = _helixResourceManager.getTableIdealState(basicRealtimeTable);
+ checkSegmentStatusCheckerMetrics(controllerMetrics, basicRealtimeTable,
idealState, 1, 100, 0, 100);
+
+ // Total metrics
+
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT),
4);
+
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT),
1);
+
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
1);
+ }
+
+ private void checkSegmentStatusCheckerMetrics(ControllerMetrics
controllerMetrics, String tableName,
+ IdealState idealState, long numReplicas, long percentReplicas, long
segmentsInErrorState,
+ long percentSegmentsAvailable) {
+ if (idealState != null) {
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.IDEALSTATE_ZNODE_SIZE),
+ idealState.toString().length());
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.SEGMENT_COUNT),
+ (long) (idealState.getPartitionSet().size()));
+ }
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.NUMBER_OF_REPLICAS),
+ numReplicas);
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.PERCENT_OF_REPLICAS),
+ percentReplicas);
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
+ segmentsInErrorState);
+ Assert.assertEquals(controllerMetrics.getValueOfTableGauge(tableName,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
+ percentSegmentsAvailable);
+ }
+
+ @AfterGroups(groups = "segmentStatusChecker")
+ public void afterTestSegmentStatusChecker(ITestContext context) throws
Exception {
+ String emptyTable = (String) context.getAttribute("emptyTable");
+ String disabledOfflineTable = (String)
context.getAttribute("disabledOfflineTable");
+ String errorOfflineTable = (String)
context.getAttribute("errorOfflineTable");
+ String basicRealtimeTable = (String)
context.getAttribute("basicRealtimeTable");
+
+ dropOfflineTable(emptyTable);
+ dropOfflineTable(disabledOfflineTable);
+ dropOfflineTable(errorOfflineTable);
+ dropOfflineTable(basicRealtimeTable);
+ }
+
+ /**
+ * Group - realtimeSegmentRelocator - Integration tests for {@link
org.apache.pinot.controller.helix.core.relocation.RealtimeSegmentRelocator}
+ * @param context
+ * @throws Exception
+ */
+ @BeforeGroups(groups = "realtimeSegmentRelocator", dependsOnGroups =
"segmentStatusChecker")
+ public void beforeRealtimeSegmentRelocatorTest(ITestContext context) throws
Exception {
+ String relocationTable = getDefaultRealtimeTableName();
+ context.setAttribute("relocationTable", relocationTable);
+
+ // setup default realtime table
+ setupRealtimeTable(relocationTable, getKafkaTopic(), _avroFiles.get(0));
+
+ // add tag override for relocation
+ TenantConfig tenantConfig = new TenantConfig();
+ tenantConfig.setServer(TENANT_NAME);
+ tenantConfig.setBroker(TENANT_NAME);
+ TagOverrideConfig tagOverrideConfig = new TagOverrideConfig();
+ tagOverrideConfig.setRealtimeConsuming(TENANT_NAME + "_REALTIME");
+ tagOverrideConfig.setRealtimeCompleted(TENANT_NAME + "_OFFLINE");
+ tenantConfig.setTagOverrideConfig(tagOverrideConfig);
+
updateRealtimeTableTenant(TableNameBuilder.extractRawTableName(relocationTable),
tenantConfig);
+ }
+
+ @Test(groups = "realtimeSegmentRelocator", dependsOnGroups =
"segmentStatusChecker")
+ public void testRealtimeSegmentRelocator(ITestContext context) throws
Exception {
+
+ String relocationTable = (String) context.getAttribute("relocationTable");
+
+ ControllerMetrics controllerMetrics =
_controllerStarter.getControllerMetrics();
+
+ long taskRunCount =
controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator",
+ ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN).count();
+ TestUtils.waitForCondition(input ->
+ controllerMetrics.getMeteredTableValue("RealtimeSegmentRelocator",
ControllerMeter.CONTROLLER_PERIODIC_TASK_RUN)
+ .count() > taskRunCount, 60_000, "Timed out waiting for
RealtimeSegmentRelocation to run");
+
+
Assert.assertTrue(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
+ "RealtimeSegmentRelocator") > 0);
+
+ // check servers for ONLINE segment and CONSUMING segments are disjoint
sets
+ Set<String> consuming = new HashSet<>();
+ Set<String> completed = new HashSet<>();
+ IdealState tableIdealState =
_helixResourceManager.getTableIdealState(relocationTable);
+ for (String partition : tableIdealState.getPartitionSet()) {
+ Map<String, String> instanceStateMap =
tableIdealState.getInstanceStateMap(partition);
+ if (instanceStateMap.containsValue("CONSUMING")) {
+ consuming.addAll(instanceStateMap.keySet());
+ }
+ if (instanceStateMap.containsValue("ONLINE")) {
+ completed.addAll(instanceStateMap.keySet());
+ }
+ }
+
+ Assert.assertTrue(Collections.disjoint(consuming, completed));
+ }
+
+ @AfterGroups(groups = "realtimeSegmentRelocator", dependsOnGroups =
"segmentStatusChecker")
+ public void afterRealtimeSegmentRelocatorTest(ITestContext context) throws
Exception {
+ String relocationTable = (String) context.getAttribute("relocationTable");
+ dropRealtimeTable(relocationTable);
+ }
+
+ // TODO: tests for other ControllerPeriodicTasks (RetentionManager,
BrokerValidationManager, OfflineSegmentIntervalChecker,
RealtimeSegmentValidationManager)
+
+ @Override
+ protected boolean isUsingNewConfigFormat() {
+ return true;
+ }
+
+ @Override
+ protected boolean useLlc() {
+ return true;
+ }
+
+ private String getDefaultOfflineTableName() {
+ return DEFAULT_TABLE_NAME + "_OFFLINE";
+ }
+
+ private String getDefaultRealtimeTableName() {
+ return DEFAULT_TABLE_NAME + "_REALTIME";
+ }
+
+ /**
+ * Tear down the cluster after tests
+ * @throws Exception
+ */
+ @AfterClass
+ public void tearDown() throws Exception {
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index c077e8a..fa88396 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -51,7 +51,6 @@ public class HybridClusterIntegrationTest extends
BaseClusterIntegrationTestSet
private static final int NUM_OFFLINE_SEGMENTS = 8;
private static final int NUM_REALTIME_SEGMENTS = 6;
- private KafkaServerStartable _kafkaStarter;
private Schema _schema;
protected int getNumOfflineSegments() {
@@ -109,12 +108,7 @@ public class HybridClusterIntegrationTest extends
BaseClusterIntegrationTestSet
throws Exception {
// Start Zk and Kafka
startZk();
- _kafkaStarter = KafkaStarterUtils
- .startServer(KafkaStarterUtils.DEFAULT_KAFKA_PORT,
KafkaStarterUtils.DEFAULT_BROKER_ID,
- KafkaStarterUtils.DEFAULT_ZK_STR,
KafkaStarterUtils.getDefaultKafkaConfiguration());
-
- // Create Kafka topic
- KafkaStarterUtils.createTopic(getKafkaTopic(),
KafkaStarterUtils.DEFAULT_ZK_STR, getNumKafkaPartitions());
+ startKafka();
// Start the Pinot cluster
ControllerConf config = getDefaultControllerConfiguration();
@@ -301,7 +295,7 @@ public class HybridClusterIntegrationTest extends
BaseClusterIntegrationTestSet
stopServer();
stopBroker();
stopController();
- KafkaStarterUtils.stopServer(_kafkaStarter);
+ stopKafka();
stopZk();
cleanup();
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
index 419a3cf..37df2e4 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java
@@ -39,7 +39,6 @@ import org.testng.annotations.Test;
* Integration test that creates a Kafka broker, creates a Pinot cluster that
consumes from Kafka and queries Pinot.
*/
public class RealtimeClusterIntegrationTest extends
BaseClusterIntegrationTestSet {
- private List<KafkaServerStartable> _kafkaStarters;
@BeforeClass
public void setUp()
@@ -79,13 +78,6 @@ public class RealtimeClusterIntegrationTest extends
BaseClusterIntegrationTestSe
waitForAllDocsLoaded(600_000L);
}
- protected void startKafka() {
- _kafkaStarters = KafkaStarterUtils
- .startServers(getNumKafkaBrokers(),
KafkaStarterUtils.DEFAULT_KAFKA_PORT, KafkaStarterUtils.DEFAULT_ZK_STR,
- KafkaStarterUtils.getDefaultKafkaConfiguration());
- KafkaStarterUtils.createTopic(getKafkaTopic(),
KafkaStarterUtils.DEFAULT_ZK_STR, getNumKafkaPartitions());
- }
-
protected void setUpTable(File avroFile)
throws Exception {
File schemaFile = getSchemaFile();
@@ -179,9 +171,7 @@ public class RealtimeClusterIntegrationTest extends
BaseClusterIntegrationTestSe
stopServer();
stopBroker();
stopController();
- for (KafkaServerStartable kafkaStarter : _kafkaStarters) {
- KafkaStarterUtils.stopServer(kafkaStarter);
- }
+ stopKafka();
stopZk();
FileUtils.deleteDirectory(_tempDir);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
deleted file mode 100644
index 9966840..0000000
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/controller/periodic/tasks/SegmentStatusCheckerIntegrationTest.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.integration.tests.controller.periodic.tasks;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-import java.io.File;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import javax.annotation.Nullable;
-import org.apache.commons.io.FileUtils;
-import org.apache.helix.model.IdealState;
-import org.apache.pinot.common.data.Schema;
-import org.apache.pinot.common.metrics.ControllerGauge;
-import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.common.utils.KafkaStarterUtils;
-import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.common.utils.retry.RetryPolicies;
-import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet;
-import org.apache.pinot.integration.tests.ClusterIntegrationTestUtils;
-import org.apache.pinot.util.TestUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-
-/**
- * Integration test to check {@link
org.apache.pinot.controller.helix.SegmentStatusChecker} is
- * running and verify the metrics emitted
- */
-public class SegmentStatusCheckerIntegrationTest extends
BaseClusterIntegrationTestSet {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentStatusCheckerIntegrationTest.class);
-
- private String emptyTable = "table1_OFFLINE";
- private String disabledOfflineTable = "table2_OFFLINE";
- private String basicOfflineTable = "table3_OFFLINE";
- private String errorOfflineTable = "table4_OFFLINE";
- private String realtimeTableErrorState = "table5_REALTIME";
- private String _currentTableName;
- private static final int NUM_TABLES = 5;
-
- private static final int SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS = 60;
- private static final int SEGMENT_STATUS_CHECKER_FREQ_SECONDS = 5;
-
- @BeforeClass
- public void setUp() throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
-
- startZk();
-
- // Set initial delay of 60 seconds for the segment status checker, to
allow time for tables setup.
- // Run at 5 seconds freq in order to keep it running, in case first run
happens before table setup
- ControllerConf controllerConf = getDefaultControllerConfiguration();
-
controllerConf.setStatusCheckerInitialDelayInSeconds(SEGMENT_STATUS_CHECKER_INITIAL_DELAY_SECONDS);
-
controllerConf.setStatusCheckerFrequencyInSeconds(SEGMENT_STATUS_CHECKER_FREQ_SECONDS);
-
- startController(controllerConf);
- startBroker();
- startServers(3);
-
- // empty table
- setupOfflineTable(emptyTable);
-
- // table with disabled ideal state
- setupOfflineTable(disabledOfflineTable);
- _helixAdmin.enableResource(_clusterName, disabledOfflineTable, false);
-
- // happy case table
- setupOfflineTableAndSegments(basicOfflineTable);
-
- // some segments offline
- setupOfflineTableAndSegments(errorOfflineTable);
- HelixHelper.updateIdealState(_helixManager, errorOfflineTable, new
Function<IdealState, IdealState>() {
- @Nullable
- @Override
- public IdealState apply(@Nullable IdealState input) {
- List<String> segmentNames =
Lists.newArrayList(input.getPartitionSet());
- Collections.sort(segmentNames);
-
- Map<String, String> instanceStateMap1 =
input.getInstanceStateMap(segmentNames.get(0));
- for (String instance : instanceStateMap1.keySet()) {
- instanceStateMap1.put(instance, "OFFLINE");
- break;
- }
- return input;
- }
- }, RetryPolicies.fixedDelayRetryPolicy(2, 10));
-
- // realtime table with segments in error state
- setupRealtimeTable(realtimeTableErrorState);
- }
-
- private void setupOfflineTable(String table) throws Exception {
- _realtimeTableConfig = null;
- addOfflineTable(table);
- completeTableConfiguration();
- }
-
- private void setupOfflineTableAndSegments(String table) throws Exception {
- TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
- setTableName(table);
- _realtimeTableConfig = null;
- addOfflineTable(table);
- completeTableConfiguration();
- List<File> avroFiles = unpackAvroData(_tempDir);
- ExecutorService executor = Executors.newCachedThreadPool();
- ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 0,
_segmentDir, _tarDir, table, false, null, null,
- null, executor);
- executor.shutdown();
- executor.awaitTermination(10, TimeUnit.MINUTES);
- uploadSegments(_tarDir);
-
- waitForAllDocsLoaded(600_000L);
- }
-
- private void setupRealtimeTable(String table) throws Exception {
- _offlineTableConfig = null;
- File schemaFile = getSchemaFile();
- Schema schema = Schema.fromFile(schemaFile);
- String schemaName = schema.getSchemaName();
- addSchema(schemaFile, schemaName);
-
- String timeColumnName = schema.getTimeColumnName();
- Assert.assertNotNull(timeColumnName);
- TimeUnit outgoingTimeUnit = schema.getOutgoingTimeUnit();
- Assert.assertNotNull(outgoingTimeUnit);
- String timeType = outgoingTimeUnit.toString();
-
- addRealtimeTable(table, useLlc(), KafkaStarterUtils.DEFAULT_KAFKA_BROKER,
KafkaStarterUtils.DEFAULT_ZK_STR,
- getKafkaTopic(), getRealtimeSegmentFlushSize(), null, timeColumnName,
timeType, schemaName, null, null,
- getLoadMode(), getSortedColumn(), getInvertedIndexColumns(),
getBloomFilterIndexColumns(), getRawIndexColumns(),
- getTaskConfig(), getStreamConsumerFactoryClassName());
- completeTableConfiguration();
- }
-
- @Override
- public String getTableName() {
- return _currentTableName;
- }
-
- private void setTableName(String tableName) {
- _currentTableName = tableName;
- }
- /**
- * After 1 run of SegmentStatusChecker the controllerMetrics will be set for
each table
- * Validate that we are seeing the expected numbers
- */
- @Test
- public void testSegmentStatusChecker() {
- ControllerMetrics controllerMetrics =
_controllerStarter.getControllerMetrics();
-
- long millisToWait = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES);
- while
(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
- "SegmentStatusChecker") < NUM_TABLES && millisToWait > 0) {
- try {
- Thread.sleep(1000);
- millisToWait -= 1000;
- } catch (InterruptedException e) {
- LOGGER.info("Interrupted while waiting for SegmentStatusChecker");
- }
- }
-
-
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.PERIODIC_TASK_NUM_TABLES_PROCESSED,
- "SegmentStatusChecker"), NUM_TABLES);
-
- // empty table - table1_OFFLINE
- // num replicas set from ideal state
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable,
ControllerGauge.NUMBER_OF_REPLICAS), 3);
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable,
ControllerGauge.PERCENT_OF_REPLICAS), 100);
- Assert.assertEquals(controllerMetrics.getValueOfTableGauge(emptyTable,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
- 100);
-
- // disabled table - table2_OFFLINE
- // reset to defaults
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(disabledOfflineTable,
ControllerGauge.NUMBER_OF_REPLICAS),
- Long.MIN_VALUE);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(disabledOfflineTable,
ControllerGauge.PERCENT_OF_REPLICAS),
- Long.MIN_VALUE);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(disabledOfflineTable,
ControllerGauge.SEGMENTS_IN_ERROR_STATE),
- Long.MIN_VALUE);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(disabledOfflineTable,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE),
- Long.MIN_VALUE);
-
- // happy path table - table3_OFFLINE
- IdealState idealState =
_helixResourceManager.getTableIdealState(basicOfflineTable);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(basicOfflineTable,
ControllerGauge.IDEALSTATE_ZNODE_SIZE),
- idealState.toString().length());
-
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable,
ControllerGauge.SEGMENT_COUNT),
- (long) (idealState.getPartitionSet().size()));
-
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable,
ControllerGauge.NUMBER_OF_REPLICAS),
- 3);
-
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(basicOfflineTable,
ControllerGauge.PERCENT_OF_REPLICAS),
- 100);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(basicOfflineTable,
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(basicOfflineTable,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-
- // offline segments - table4_OFFLINE
- // 2 replicas available out of 3, percent 66
- idealState = _helixResourceManager.getTableIdealState(errorOfflineTable);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(errorOfflineTable,
ControllerGauge.IDEALSTATE_ZNODE_SIZE),
- idealState.toString().length());
-
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable,
ControllerGauge.SEGMENT_COUNT),
- (long) (idealState.getPartitionSet().size()));
-
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable,
ControllerGauge.NUMBER_OF_REPLICAS),
- 2);
-
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(errorOfflineTable,
ControllerGauge.PERCENT_OF_REPLICAS),
- 66);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(errorOfflineTable,
ControllerGauge.SEGMENTS_IN_ERROR_STATE), 0);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(errorOfflineTable,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 100);
-
- // error segments - table5_REALTIME
- // no replicas available, all segments in error state
- idealState =
_helixResourceManager.getTableIdealState(realtimeTableErrorState);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(realtimeTableErrorState,
ControllerGauge.IDEALSTATE_ZNODE_SIZE),
- idealState.toString().length());
-
Assert.assertEquals(controllerMetrics.getValueOfTableGauge(realtimeTableErrorState,
ControllerGauge.SEGMENT_COUNT),
- (long) (idealState.getPartitionSet().size()));
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(realtimeTableErrorState,
ControllerGauge.NUMBER_OF_REPLICAS), 0);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(realtimeTableErrorState,
ControllerGauge.PERCENT_OF_REPLICAS), 0);
- Assert.assertTrue(
- controllerMetrics.getValueOfTableGauge(realtimeTableErrorState,
ControllerGauge.SEGMENTS_IN_ERROR_STATE) > 0);
- Assert.assertEquals(
- controllerMetrics.getValueOfTableGauge(realtimeTableErrorState,
ControllerGauge.PERCENT_SEGMENTS_AVAILABLE), 0);
-
- // Total metrics
-
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.OFFLINE_TABLE_COUNT),
4);
-
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.REALTIME_TABLE_COUNT),
1);
-
Assert.assertEquals(controllerMetrics.getValueOfGlobalGauge(ControllerGauge.DISABLED_TABLE_COUNT),
1);
- }
-
- @Override
- protected boolean isUsingNewConfigFormat() {
- return true;
- }
-
- @AfterClass
- public void tearDown() throws Exception {
- dropRealtimeTable(realtimeTableErrorState);
- dropOfflineTable(emptyTable);
- dropOfflineTable(disabledOfflineTable);
- dropOfflineTable(basicOfflineTable);
- dropOfflineTable(errorOfflineTable);
-
- stopServer();
- stopBroker();
- stopController();
- stopZk();
- FileUtils.deleteDirectory(_tempDir);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]