This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new af75788 Adding support for Map type fields (#4388)
af75788 is described below
commit af7578882e6149257396cf7d89d1e06630a34e4a
Author: Kishore Gopalakrishna <[email protected]>
AuthorDate: Thu Aug 8 19:07:54 2019 -0700
Adding support for Map type fields (#4388)
* Adding UDF to support MAP type
* Minor changes to extraction logic. Adding additional tests
* Adding UDF to support MAP type
* Minor changes to extraction logic. Adding additional tests
* Addressing review comments
* Fixing compilation issue
* Update tests.
---
.../pinot/core/data/readers/AvroRecordReader.java | 3 +-
.../function/MapValueTransformFunction.java | 95 +++++++++
.../function/TransformFunctionFactory.java | 2 +-
.../stream/AvroRecordToPinotRowGenerator.java | 6 +-
.../creator/impl/SegmentDictionaryCreator.java | 2 -
.../java/org/apache/pinot/core/util/AvroUtils.java | 35 ++++
.../function/ValueInTransformFunctionTest.java | 2 -
.../tests/MapTypeClusterIntegrationTest.java | 223 +++++++++++++++++++++
8 files changed, 359 insertions(+), 9 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
index dd6e131..12ec937 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/AvroRecordReader.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
@@ -82,7 +83,7 @@ public class AvroRecordReader implements RecordReader {
Object value = _reusableAvroRecord.get(fieldName);
// Allow default value for non-time columns
if (value != null || fieldSpec.getFieldType() !=
FieldSpec.FieldType.TIME) {
- reuse.putField(fieldName, RecordReaderUtils.convert(fieldSpec, value));
+ AvroUtils.extractField(fieldSpec, _reusableAvroRecord, reuse);
}
}
return reuse;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/MapValueTransformFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/MapValueTransformFunction.java
new file mode 100644
index 0000000..8053d3a
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/MapValueTransformFunction.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nonnull;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.operator.blocks.ProjectionBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+
+/**
+ * map_value(keyColName, 'keyName', valColName)
+ */
+public class MapValueTransformFunction extends BaseTransformFunction {
+
+ public static final String FUNCTION_NAME = "map_value";
+
+ private TransformResultMetadata _resultMetadata;
+ private TransformFunction _keyColumnFunction;
+ private String _keyName;
+ private TransformFunction _valueColumnFunction;
+ private int[][] _keyDictIds;
+ private int[][] _valueDictIds;
+ private int _inputKeyDictId;
+ private int[] _outputValueDictIds;
+
+ @Override
+ public String getName() {
+ return FUNCTION_NAME;
+ }
+
+ @Override
+ public void init(@Nonnull List<TransformFunction> arguments, @Nonnull
Map<String, DataSource> dataSourceMap) {
+ int numArguments = arguments.size();
+ if (numArguments != 3) {
+ throw new IllegalArgumentException("3 arguments are required for
MAP_VALUE transform function map_value(keyColName, 'keyName', valColName)");
+ }
+ _keyColumnFunction = arguments.get(0);
+ _keyName = ((LiteralTransformFunction) arguments.get(1)).getLiteral();
+ _valueColumnFunction = arguments.get(2);
+ TransformResultMetadata valueColumnMetadata =
_valueColumnFunction.getResultMetadata();
+ _resultMetadata =
+ new TransformResultMetadata(valueColumnMetadata.getDataType(), true,
valueColumnMetadata.hasDictionary());
+ _inputKeyDictId = _keyColumnFunction.getDictionary().indexOf(_keyName);
+ }
+
+ @Override
+ public TransformResultMetadata getResultMetadata() {
+ return _resultMetadata;
+ }
+
+ @Override
+ public int[] transformToDictIdsSV(@Nonnull ProjectionBlock projectionBlock) {
+ _outputValueDictIds = new int[DocIdSetPlanNode.MAX_DOC_PER_CALL];
+
+ _keyDictIds = _keyColumnFunction.transformToDictIdsMV(projectionBlock);
+ _valueDictIds = _valueColumnFunction.transformToDictIdsMV(projectionBlock);
+ int length = projectionBlock.getNumDocs();
+ for (int i = 0; i < length; i++) {
+ int numKeys = _keyDictIds[i].length;
+ for (int j = 0; j < numKeys; j++) {
+ if (_keyDictIds[i][j] == _inputKeyDictId) {
+ _outputValueDictIds[i] = _valueDictIds[i][j];
+ break;
+ }
+ }
+ }
+ return _outputValueDictIds;
+ }
+
+ @Override
+ public Dictionary getDictionary() {
+ return _valueColumnFunction.getDictionary();
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
index c8c3808..fb89bcd 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
@@ -49,13 +49,13 @@ public class TransformFunctionFactory {
put(DateTimeConversionTransformFunction.FUNCTION_NAME.toLowerCase(),
DateTimeConversionTransformFunction.class);
put(ValueInTransformFunction.FUNCTION_NAME.toLowerCase(),
ValueInTransformFunction.class);
-
put(AbsTransformFunction.FUNCTION_NAME.toLowerCase(),
AbsTransformFunction.class);
put(CeilTransformFunction.FUNCTION_NAME.toLowerCase(),
CeilTransformFunction.class);
put(ExpTransformFunction.FUNCTION_NAME.toLowerCase(),
ExpTransformFunction.class);
put(FloorTransformFunction.FUNCTION_NAME.toLowerCase(),
FloorTransformFunction.class);
put(LnTransformFunction.FUNCTION_NAME.toLowerCase(),
LnTransformFunction.class);
put(SqrtTransformFunction.FUNCTION_NAME.toLowerCase(),
SqrtTransformFunction.class);
+ put(MapValueTransformFunction.FUNCTION_NAME.toLowerCase(),
MapValueTransformFunction.class);
}
};
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/AvroRecordToPinotRowGenerator.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/AvroRecordToPinotRowGenerator.java
index 9881040..cd4f824 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/AvroRecordToPinotRowGenerator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/AvroRecordToPinotRowGenerator.java
@@ -24,10 +24,11 @@ import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.data.TimeFieldSpec;
import org.apache.pinot.core.data.GenericRow;
-import org.apache.pinot.core.data.readers.RecordReaderUtils;
+import org.apache.pinot.core.util.AvroUtils;
public class AvroRecordToPinotRowGenerator {
+
private final Schema _schema;
private final FieldSpec _incomingTimeFieldSpec;
@@ -44,8 +45,7 @@ public class AvroRecordToPinotRowGenerator {
for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
FieldSpec incomingFieldSpec =
fieldSpec.getFieldType() == FieldSpec.FieldType.TIME ?
_incomingTimeFieldSpec : fieldSpec;
- String fieldName = incomingFieldSpec.getName();
- to.putField(fieldName, RecordReaderUtils.convert(incomingFieldSpec,
from.get(fieldName)));
+ AvroUtils.extractField(incomingFieldSpec, from, to);
}
return to;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java
index 105c5fa..81cbfa7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentDictionaryCreator.java
@@ -295,8 +295,6 @@ public class SegmentDictionaryCreator implements Closeable {
default:
throw new UnsupportedOperationException("Unsupported data type : " +
_fieldSpec.getDataType());
}
-
- Arrays.sort(indexes);
return indexes;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
index 6930b82..ffe31a5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/AvroUtils.java
@@ -22,8 +22,11 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import javax.annotation.Nonnull;
@@ -38,12 +41,16 @@ import org.apache.pinot.common.data.FieldSpec;
import org.apache.pinot.common.data.MetricFieldSpec;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.data.TimeFieldSpec;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.data.readers.RecordReaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AvroUtils {
private static final Logger LOGGER =
LoggerFactory.getLogger(AvroUtils.class);
+ public static final String MAP_KEY_COLUMN_SUFFIX = "__KEYS";
+ public static final String MAP_VALUE_COLUMN_SUFFIX = "__VALUES";
private AvroUtils() {
}
@@ -296,4 +303,32 @@ public class AvroUtils {
return fieldSchema;
}
}
+
+ public static void extractField(FieldSpec fieldSpec, GenericRecord from,
GenericRow to) {
+ String fieldName = fieldSpec.getName();
+ //Handle MAP types
+ if (fieldName.toUpperCase().endsWith(MAP_KEY_COLUMN_SUFFIX)) {
+ String avroFieldName = fieldName.replaceAll(MAP_KEY_COLUMN_SUFFIX, "");
+ Object o = from.get(avroFieldName);
+ if (o instanceof Map) {
+ Map map = (Map) o;
+ TreeSet sortedKeySet = new TreeSet(map.keySet());
+ to.putField(fieldName, RecordReaderUtils.convert(fieldSpec,
sortedKeySet));
+ }
+ } else if (fieldName.toUpperCase().endsWith(MAP_VALUE_COLUMN_SUFFIX)) {
+ String avroFieldName = fieldName.replaceAll(MAP_VALUE_COLUMN_SUFFIX, "");
+ Object o = from.get(avroFieldName);
+ if (o instanceof Map) {
+ Map map = (Map) o;
+ TreeSet sortedKeySet = new TreeSet(map.keySet());
+ List values = new ArrayList(map.size());
+ for (Object key : sortedKeySet) {
+ values.add(map.get(key));
+ }
+ to.putField(fieldName, RecordReaderUtils.convert(fieldSpec, values));
+ }
+ } else {
+ to.putField(fieldName, RecordReaderUtils.convert(fieldSpec,
from.get(fieldName)));
+ }
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ValueInTransformFunctionTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ValueInTransformFunctionTest.java
index 0224a1f..30b31db 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ValueInTransformFunctionTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/ValueInTransformFunctionTest.java
@@ -54,8 +54,6 @@ public class ValueInTransformFunctionTest extends
BaseTransformFunctionTest {
}
}
int[] expectedValues = expectedList.toIntArray();
- // NOTE: need to sort the expected array because we sort the dictionary
Ids for multi-valued entries
- Arrays.sort(expectedValues);
int numValues = expectedValues.length;
for (int j = 0; j < numValues; j++) {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
new file mode 100644
index 0000000..d89b780
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nonnull;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.data.DimensionFieldSpec;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.FieldSpec.DataType;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Lists;
+
+
+public class MapTypeClusterIntegrationTest extends BaseClusterIntegrationTest {
+
+ protected static final String DEFAULT_TABLE_NAME = "myTable";
+ static final long TOTAL_DOCS = 1_000L;
+
+ protected Schema _schema;
+
+ private String _currentTable;
+
+ @Nonnull
+ @Override
+ protected String getTableName() {
+ return _currentTable;
+ }
+
+ @Nonnull
+ @Override
+ protected String getSchemaFileName() {
+ return "";
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+ startBroker();
+ startServers(1);
+
+ _schema = new Schema();
+ FieldSpec keyFieldSpec = new DimensionFieldSpec();
+ keyFieldSpec.setDataType(DataType.STRING);
+ keyFieldSpec.setDefaultNullValue("");
+ keyFieldSpec.setName("myMap__KEYS");
+ keyFieldSpec.setSingleValueField(false);
+ _schema.addField(keyFieldSpec);
+ FieldSpec valueFieldSpec = new DimensionFieldSpec();
+ valueFieldSpec.setDataType(DataType.STRING);
+ valueFieldSpec.setDefaultNullValue("");
+ valueFieldSpec.setName("myMap__VALUES");
+ valueFieldSpec.setSingleValueField(false);
+ _schema.addField(valueFieldSpec);
+
+ // Create the tables
+ ArrayList<String> invertedIndexColumns = Lists.newArrayList();
+ addOfflineTable(DEFAULT_TABLE_NAME, null, null, null, null, null,
SegmentVersion.v1, invertedIndexColumns, null,
+ null, null, null);
+
+ setUpSegmentsAndQueryGenerator();
+
+ // Wait for all documents loaded
+ _currentTable = DEFAULT_TABLE_NAME;
+ waitForAllDocsLoaded(60_000);
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ return TOTAL_DOCS;
+ }
+
+ protected void setUpSegmentsAndQueryGenerator()
+ throws Exception {
+
+ org.apache.avro.Schema mapAvroSchema =
org.apache.avro.Schema.createMap(org.apache.avro.Schema.create(Type.STRING));
+ List<Field> fields = new ArrayList<>();
+ fields.add(new Field("myMap", mapAvroSchema, "", null));
+ org.apache.avro.Schema avroSchema =
org.apache.avro.Schema.createRecord("myRecord", "some desc", null, false);
+ avroSchema.setFields(fields);
+
+ DataFileWriter<GenericData.Record> recordWriter =
+ new DataFileWriter<>(new
GenericDatumWriter<GenericData.Record>(avroSchema));
+ String parent = "/tmp/mapTest";
+ File avroFile = new File(parent, "part-" + 0 + ".avro");
+ avroFile.getParentFile().mkdirs();
+ recordWriter.create(avroSchema, avroFile);
+ for (int i = 0; i < TOTAL_DOCS; i++) {
+ Map<String, String> map = new HashMap<>();
+ map.put("k1", "value-k1-" + i);
+ map.put("k2", "value-k2-" + i);
+ GenericData.Record record = new GenericData.Record(avroSchema);
+ record.put("myMap", map);
+ recordWriter.append(record);
+ }
+ recordWriter.close();
+
+ // Unpack the Avro files
+ List<File> avroFiles = Lists.newArrayList(avroFile);
+
+ // Create and upload segments without star tree indexes from Avro data
+ createAndUploadSegments(avroFiles, DEFAULT_TABLE_NAME, false);
+ }
+
+ private void createAndUploadSegments(List<File> avroFiles, String tableName,
boolean createStarTreeIndex)
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ ClusterIntegrationTestUtils
+ .buildSegmentsFromAvro(avroFiles, 0, _segmentDir, _tarDir, tableName,
createStarTreeIndex, null, null, _schema,
+ executor);
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.MINUTES);
+
+ uploadSegments(getTableName(), _tarDir);
+ }
+
+ @Test
+ public void testQueries()
+ throws Exception {
+
+ //Selection Query
+ String pqlQuery = "Select map_value(myMap__KEYS, 'k1', myMap__VALUES) from
" + DEFAULT_TABLE_NAME;
+ JsonNode pinotResponse = postQuery(pqlQuery);
+ ArrayNode selectionResults = (ArrayNode)
pinotResponse.get("selectionResults").get("results");
+ Assert.assertNotNull(selectionResults);
+ Assert.assertTrue(selectionResults.size() > 0);
+ for (int i = 0; i < selectionResults.size(); i++) {
+ String value = selectionResults.get(i).get(0).textValue();
+ Assert.assertTrue(value.indexOf("-k1-") > 0);
+ }
+
+ //Filter Query
+ pqlQuery = "Select map_value(myMap__KEYS, 'k1', myMap__VALUES) from " +
DEFAULT_TABLE_NAME
+ + " where map_value(myMap__KEYS, 'k1', myMap__VALUES) = 'value-k1-0'";
+ pinotResponse = postQuery(pqlQuery);
+ selectionResults = (ArrayNode)
pinotResponse.get("selectionResults").get("results");
+ Assert.assertNotNull(selectionResults);
+ Assert.assertTrue(selectionResults.size() > 0);
+ for (int i = 0; i < selectionResults.size(); i++) {
+ String value = selectionResults.get(i).get(0).textValue();
+ Assert.assertEquals(value, "value-k1-0");
+ }
+
+ //selection order by
+ pqlQuery = "Select map_value(myMap__KEYS, 'k1', myMap__VALUES) from " +
DEFAULT_TABLE_NAME
+ + " order by map_value(myMap__KEYS, 'k1', myMap__VALUES)";
+ pinotResponse = postQuery(pqlQuery);
+ selectionResults = (ArrayNode)
pinotResponse.get("selectionResults").get("results");
+ Assert.assertNotNull(selectionResults);
+ Assert.assertTrue(selectionResults.size() > 0);
+ for (int i = 0; i < selectionResults.size(); i++) {
+ String value = selectionResults.get(i).get(0).textValue();
+ Assert.assertTrue(value.indexOf("-k1-") > 0);
+ }
+
+ //Group By Query
+ pqlQuery = "Select count(*) from " + DEFAULT_TABLE_NAME + " group by
map_value(myMap__KEYS, 'k1', myMap__VALUES)";
+ pinotResponse = postQuery(pqlQuery);
+ Assert.assertNotNull(pinotResponse.get("aggregationResults"));
+ JsonNode groupByResult =
pinotResponse.get("aggregationResults").get(0).get("groupByResult");
+ Assert.assertNotNull(groupByResult);
+ Assert.assertTrue(groupByResult.isArray());
+ Assert.assertTrue(groupByResult.size() > 0);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ dropOfflineTable(DEFAULT_TABLE_NAME);
+
+ stopServer();
+ stopBroker();
+ stopController();
+ stopZk();
+
+ FileUtils.deleteDirectory(_tempDir);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]