This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 079b86d  Config for overriding initial rows threshold in segment size 
auto tuning (#4376)
079b86d is described below

commit 079b86d897afb76a4328cb5a057282676b6a2213
Author: Neha Pawar <[email protected]>
AuthorDate: Fri Jul 12 11:09:49 2019 -0700

    Config for overriding initial rows threshold in segment size auto tuning 
(#4376)
---
 .../segment/FlushThresholdUpdateManager.java       | 11 ++---
 .../SegmentSizeBasedFlushThresholdUpdater.java     | 14 +++---
 .../segment/FlushThresholdUpdaterTest.java         | 28 +++++++----
 .../pinot/core/realtime/stream/StreamConfig.java   | 55 ++++++++++++++++------
 .../realtime/stream/StreamConfigProperties.java    |  8 ++--
 5 files changed, 76 insertions(+), 40 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
index a5c44ab..e7cf2e7 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdateManager.java
@@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory;
  */
 public class FlushThresholdUpdateManager {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(FlushThresholdUpdateManager.class);
-
   private ConcurrentMap<String, FlushThresholdUpdater> 
_flushThresholdUpdaterMap = new ConcurrentHashMap<>();
 
   /**
@@ -41,8 +39,6 @@ public class FlushThresholdUpdateManager {
    * If flush size < 0, create a new DefaultFlushThresholdUpdater with default 
flush size
    * If flush size > 0, create a new DefaultFlushThresholdUpdater with given 
flush size.
    * If flush size == 0, create new SegmentSizeBasedFlushThresholdUpdater if 
not already created. Create only 1 per table, because we want to maintain 
tuning information for the table in the updater
-   * @param realtimeTableConfig
-   * @return
    */
   public FlushThresholdUpdater getFlushThresholdUpdater(TableConfig 
realtimeTableConfig) {
     final String tableName = realtimeTableConfig.getTableName();
@@ -50,11 +46,12 @@ public class FlushThresholdUpdateManager {
         new 
PartitionLevelStreamConfig(realtimeTableConfig.getIndexingConfig().getStreamConfigs());
 
     final int tableFlushSize = streamConfig.getFlushThresholdRows();
-    final long desiredSegmentSize = 
streamConfig.getFlushSegmentDesiredSizeBytes();
 
     if (tableFlushSize == 0) {
-      return _flushThresholdUpdaterMap
-          .computeIfAbsent(tableName, k -> new 
SegmentSizeBasedFlushThresholdUpdater(desiredSegmentSize));
+      final long desiredSegmentSize = 
streamConfig.getFlushSegmentDesiredSizeBytes();
+      final int flushAutotuneInitialRows = 
streamConfig.getFlushAutotuneInitialRows();
+      return _flushThresholdUpdaterMap.computeIfAbsent(tableName,
+          k -> new SegmentSizeBasedFlushThresholdUpdater(desiredSegmentSize, 
flushAutotuneInitialRows));
     } else {
       _flushThresholdUpdaterMap.remove(tableName);
       return new DefaultFlushThresholdUpdater(tableFlushSize);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
index 116280b..d3c40c4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
@@ -39,14 +39,13 @@ public class SegmentSizeBasedFlushThresholdUpdater 
implements FlushThresholdUpda
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class);
 
-  private static final int INITIAL_ROWS_THRESHOLD = 100_000;
-
   private static final double CURRENT_SEGMENT_RATIO_WEIGHT = 0.1;
   private static final double PREVIOUS_SEGMENT_RATIO_WEIGHT = 0.9;
   private static final double ROWS_MULTIPLIER_WHEN_TIME_THRESHOLD_HIT = 1.1;
   private static final int MINIMUM_NUM_ROWS_THRESHOLD = 10_000;
 
   private final long _desiredSegmentSizeBytes;
+  private final int _autotuneInitialRows;
 
   /** Below this size, we double the rows threshold */
   private final double _optimalSegmentSizeBytesMin;
@@ -54,8 +53,8 @@ public class SegmentSizeBasedFlushThresholdUpdater implements 
FlushThresholdUpda
   private final double _optimalSegmentSizeBytesMax;
 
   @VisibleForTesting
-  int getInitialRowsThreshold() {
-    return INITIAL_ROWS_THRESHOLD;
+  int getAutotuneInitialRows() {
+    return _autotuneInitialRows;
   }
 
   @VisibleForTesting
@@ -81,10 +80,11 @@ public class SegmentSizeBasedFlushThresholdUpdater 
implements FlushThresholdUpda
   // num rows to segment size ratio of last committed segment for this table
   private double _latestSegmentRowsToSizeRatio = 0;
 
-  public SegmentSizeBasedFlushThresholdUpdater(long desiredSegmentSizeBytes) {
+  public SegmentSizeBasedFlushThresholdUpdater(long desiredSegmentSizeBytes, 
int autotuneInitialRows) {
     _desiredSegmentSizeBytes = desiredSegmentSizeBytes;
     _optimalSegmentSizeBytesMin = _desiredSegmentSizeBytes / 2;
     _optimalSegmentSizeBytesMax = _desiredSegmentSizeBytes * 1.5;
+    _autotuneInitialRows = autotuneInitialRows;
   }
 
   // synchronized since this method could be called for multiple partitions of 
the same table in different threads
@@ -104,8 +104,8 @@ public class SegmentSizeBasedFlushThresholdUpdater 
implements FlushThresholdUpda
         newSegmentZKMetadata.setSizeThresholdToFlushSegment((int) 
targetSegmentNumRows);
       } else {
         LOGGER.info("Committing segment zk metadata is not available, setting 
threshold for {} as {}", newSegmentName,
-            INITIAL_ROWS_THRESHOLD);
-        
newSegmentZKMetadata.setSizeThresholdToFlushSegment(INITIAL_ROWS_THRESHOLD);
+            _autotuneInitialRows);
+        
newSegmentZKMetadata.setSizeThresholdToFlushSegment(_autotuneInitialRows);
       }
       return;
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
index 425ebff..b539565 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
@@ -38,6 +38,7 @@ import org.testng.annotations.Test;
 
 public class FlushThresholdUpdaterTest {
   private static final long DESIRED_SEGMENT_SIZE = 
StreamConfig.getDefaultDesiredSegmentSizeBytes();
+  private static final int DEFAULT_INITIAL_ROWS_THRESHOLD = 
StreamConfig.getDefaultFlushAutotuneInitialRows();
   private Random _random;
   private Map<String, double[][]> datasetGraph;
 
@@ -155,7 +156,7 @@ public class FlushThresholdUpdaterTest {
     for (Map.Entry<String, double[][]> entry : datasetGraph.entrySet()) {
 
       SegmentSizeBasedFlushThresholdUpdater 
segmentSizeBasedFlushThresholdUpdater =
-          new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE);
+          new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE, 
DEFAULT_INITIAL_ROWS_THRESHOLD);
 
       double[][] numRowsToSegmentSize = entry.getValue();
 
@@ -185,7 +186,7 @@ public class FlushThresholdUpdaterTest {
       segmentSizeBasedFlushThresholdUpdater
           .updateFlushThreshold(newSegmentMetadata, null, 
committingSegmentDescriptor, null);
       Assert.assertEquals(newSegmentMetadata.getSizeThresholdToFlushSegment(),
-          segmentSizeBasedFlushThresholdUpdater.getInitialRowsThreshold());
+          segmentSizeBasedFlushThresholdUpdater.getAutotuneInitialRows());
 
       System.out.println("NumRowsThreshold, SegmentSize");
       for (int run = 0; run < numRuns; run++) {
@@ -310,7 +311,7 @@ public class FlushThresholdUpdaterTest {
     Assert.assertEquals(flushThresholdUpdater.getClass(), 
DefaultFlushThresholdUpdater.class);
     Assert.assertEquals(((DefaultFlushThresholdUpdater) 
flushThresholdUpdater).getTableFlushSize(), 20000);
 
-    // optimal segment size set to invalid value. Defailt remains the same.
+    // optimal segment size set to invalid value. Default remains the same.
     streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
"0");
     streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_DESIRED_SIZE, 
"Invalid");
     realtimeTableConfig = tableConfigBuilder.build();
@@ -332,6 +333,16 @@ public class FlushThresholdUpdaterTest {
     flushThresholdUpdater = 
manager.getFlushThresholdUpdater(realtimeTableConfig);
     Assert.assertEquals(((SegmentSizeBasedFlushThresholdUpdater) 
(flushThresholdUpdater)).getDesiredSegmentSizeBytes(),
         desiredSegSize);
+    Assert.assertEquals(((SegmentSizeBasedFlushThresholdUpdater) 
(flushThresholdUpdater)).getAutotuneInitialRows(),
+        DEFAULT_INITIAL_ROWS_THRESHOLD);
+
+    // initial rows threshold
+    
streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS, 
"500000");
+    realtimeTableConfig = tableConfigBuilder.build();
+    FlushThresholdUpdateManager newManager = new FlushThresholdUpdateManager();
+    flushThresholdUpdater = 
newManager.getFlushThresholdUpdater(realtimeTableConfig);
+    Assert.assertEquals(((SegmentSizeBasedFlushThresholdUpdater) 
(flushThresholdUpdater)).getAutotuneInitialRows(),
+        500_000);
   }
 
   /**
@@ -368,7 +379,8 @@ public class FlushThresholdUpdaterTest {
     Assert.assertNull(metadata0.getTimeThresholdToFlushSegment());
 
     // before committing segment, we switched to size based updation - verify 
that new thresholds are set as per size based strategy
-    flushThresholdUpdater = new 
SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE);
+    flushThresholdUpdater = new 
SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE,
+        DEFAULT_INITIAL_ROWS_THRESHOLD);
 
     startOffset += 1000;
     updateCommittingSegmentMetadata(metadata0, startOffset, 250_000);
@@ -407,10 +419,10 @@ public class FlushThresholdUpdaterTest {
     // initial segment
     LLCRealtimeSegmentZKMetadata metadata0 = getNextSegmentMetadata(tableName, 
startOffset, partitionId, seqNum++);
     SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
-        new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE);
+        new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE, 
DEFAULT_INITIAL_ROWS_THRESHOLD);
     committingSegmentDescriptor = new 
CommittingSegmentDescriptor(metadata0.getSegmentName(), startOffset, 0);
     flushThresholdUpdater.updateFlushThreshold(metadata0, null, 
committingSegmentDescriptor, null);
-    Assert.assertEquals(metadata0.getSizeThresholdToFlushSegment(), 
flushThresholdUpdater.getInitialRowsThreshold());
+    Assert.assertEquals(metadata0.getSizeThresholdToFlushSegment(), 
flushThresholdUpdater.getAutotuneInitialRows());
 
     // next segment hit time threshold
     startOffset += 1000;
@@ -451,7 +463,7 @@ public class FlushThresholdUpdaterTest {
     LLCRealtimeSegmentZKMetadata metadata0 = getNextSegmentMetadata(tableName, 
startOffset, partitionId, seqNum++);
     metadata0.setSegmentName(seg0SegmentName.getSegmentName());
     SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
-        new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE);
+        new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE, 
DEFAULT_INITIAL_ROWS_THRESHOLD);
     committingSegmentDescriptor =
         new CommittingSegmentDescriptor(seg0SegmentName.getSegmentName(), 
startOffset, 10_000);
     metadata0.setTotalRawDocs(15);
@@ -486,7 +498,7 @@ public class FlushThresholdUpdaterTest {
     long seg0time = now - 1334_650;
     long seg1time = seg0time + 14_000;
     SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
-        new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE);
+        new SegmentSizeBasedFlushThresholdUpdater(DESIRED_SEGMENT_SIZE, 
DEFAULT_INITIAL_ROWS_THRESHOLD);
 
     // Initial update is from partition 1
     LLCSegmentName seg0SegmentName = new LLCSegmentName(tableName, 1, seqNum, 
seg0time);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java
index 1d37534..ad65660 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfig.java
@@ -47,12 +47,13 @@ public class StreamConfig {
   private static final int DEFAULT_FLUSH_THRESHOLD_ROWS = 5_000_000;
   private static final long DEFAULT_FLUSH_THRESHOLD_TIME = 
TimeUnit.MILLISECONDS.convert(6, TimeUnit.HOURS);
   private static final long DEFAULT_DESIRED_SEGMENT_SIZE_BYTES = 200 * 1024 * 
1024; // 200M
+  private static final int DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS = 100_000;
   private static final String DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING =
       "org.apache.pinot.core.realtime.impl.kafka.KafkaConsumerFactory";
 
-  protected static final long DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS = 
30_000;
-  protected static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS = 5_000;
-  protected static final String SIMPLE_CONSUMER_TYPE_STRING = "simple";
+  static final long DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS = 30_000;
+  static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS = 5_000;
+  private static final String SIMPLE_CONSUMER_TYPE_STRING = "simple";
 
   final private String _type;
   final private String _topicName;
@@ -68,6 +69,7 @@ public class StreamConfig {
   final private int _flushThresholdRows;
   final private long _flushThresholdTimeMillis;
   final private long _flushSegmentDesiredSizeBytes;
+  final private int _flushAutotuneInitialRows; // initial num rows to use for 
SegmentSizeBasedFlushThresholdUpdater
 
   final private String _groupId;
 
@@ -135,8 +137,8 @@ public class StreamConfig {
       try {
         connectionTimeoutMillis = Long.parseLong(connectionTimeoutValue);
       } catch (Exception e) {
-        LOGGER.warn("Caught exception while parsing the connection timeout, 
defaulting to {} ms", e,
-            DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS);
+        LOGGER.warn("Caught exception while parsing the connection timeout, 
defaulting to {} ms",
+            DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS, e);
       }
     }
     _connectionTimeoutMillis = connectionTimeoutMillis;
@@ -197,6 +199,19 @@ public class StreamConfig {
       _flushSegmentDesiredSizeBytes = DEFAULT_DESIRED_SEGMENT_SIZE_BYTES;
     }
 
+    int autotuneInitialRows = 0;
+    String initialRowsValue = 
streamConfigMap.get(StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS);
+    if (initialRowsValue != null) {
+      try {
+        autotuneInitialRows = Integer.parseInt(initialRowsValue);
+      } catch (Exception e) {
+        LOGGER.warn("Caught exception while parsing {}:{}, defaulting to {}",
+            StreamConfigProperties.SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS, 
initialRowsValue,
+            DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS, e);
+      }
+    }
+    _flushAutotuneInitialRows = autotuneInitialRows > 0 ? autotuneInitialRows 
: DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS;
+
     String groupIdKey = StreamConfigProperties.constructStreamProperty(_type, 
StreamConfigProperties.GROUP_ID);
     _groupId = streamConfigMap.get(groupIdKey);
 
@@ -227,10 +242,22 @@ public class StreamConfig {
     return _consumerFactoryClassName;
   }
 
+  public static String getDefaultConsumerFactoryClassName() {
+    return DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING;
+  }
+
   public OffsetCriteria getOffsetCriteria() {
     return _offsetCriteria;
   }
 
+  public String getDecoderClass() {
+    return _decoderClass;
+  }
+
+  public Map<String, String> getDecoderProperties() {
+    return _decoderProperties;
+  }
+
   public long getConnectionTimeoutMillis() {
     return _connectionTimeoutMillis;
   }
@@ -255,10 +282,6 @@ public class StreamConfig {
     return DEFAULT_FLUSH_THRESHOLD_TIME;
   }
 
-  public static String getDefaultConsumerFactoryClassName() {
-    return DEFAULT_CONSUMER_FACTORY_CLASS_NAME_STRING;
-  }
-
   public long getFlushSegmentDesiredSizeBytes() {
     return _flushSegmentDesiredSizeBytes;
   }
@@ -267,12 +290,12 @@ public class StreamConfig {
     return DEFAULT_DESIRED_SEGMENT_SIZE_BYTES;
   }
 
-  public String getDecoderClass() {
-    return _decoderClass;
+  public int getFlushAutotuneInitialRows() {
+    return _flushAutotuneInitialRows;
   }
 
-  public Map<String, String> getDecoderProperties() {
-    return _decoderProperties;
+  public static int getDefaultFlushAutotuneInitialRows() {
+    return DEFAULT_FLUSH_AUTOTUNE_INITIAL_ROWS;
   }
 
   public String getGroupId() {
@@ -290,8 +313,8 @@ public class StreamConfig {
         + _offsetCriteria + '\'' + ", _connectionTimeoutMillis=" + 
_connectionTimeoutMillis + ", _fetchTimeoutMillis="
         + _fetchTimeoutMillis + ", _flushThresholdRows=" + _flushThresholdRows 
+ ", _flushThresholdTimeMillis="
         + _flushThresholdTimeMillis + ", _flushSegmentDesiredSizeBytes=" + 
_flushSegmentDesiredSizeBytes
-        + ", _decoderClass='" + _decoderClass + '\'' + ", _decoderProperties=" 
+ _decoderProperties + ", _groupId='"
-        + _groupId + '}';
+        + ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows + ", 
_decoderClass='" + _decoderClass
+        + '\'' + ", _decoderProperties=" + _decoderProperties + ", _groupId='" 
+ _groupId + '}';
   }
 
   @Override
@@ -311,6 +334,7 @@ public class StreamConfig {
         .isEqual(_flushThresholdRows, that._flushThresholdRows) && 
EqualityUtils
         .isEqual(_flushThresholdTimeMillis, that._flushThresholdTimeMillis) && 
EqualityUtils
         .isEqual(_flushSegmentDesiredSizeBytes, 
that._flushSegmentDesiredSizeBytes) && EqualityUtils
+        .isEqual(_flushAutotuneInitialRows, that._flushAutotuneInitialRows) && 
EqualityUtils
         .isEqual(_type, that._type) && EqualityUtils.isEqual(_topicName, 
that._topicName) && EqualityUtils
         .isEqual(_consumerTypes, that._consumerTypes) && EqualityUtils
         .isEqual(_consumerFactoryClassName, that._consumerFactoryClassName) && 
EqualityUtils
@@ -331,6 +355,7 @@ public class StreamConfig {
     result = EqualityUtils.hashCodeOf(result, _flushThresholdRows);
     result = EqualityUtils.hashCodeOf(result, _flushThresholdTimeMillis);
     result = EqualityUtils.hashCodeOf(result, _flushSegmentDesiredSizeBytes);
+    result = EqualityUtils.hashCodeOf(result, _flushAutotuneInitialRows);
     result = EqualityUtils.hashCodeOf(result, _decoderClass);
     result = EqualityUtils.hashCodeOf(result, _decoderProperties);
     result = EqualityUtils.hashCodeOf(result, _groupId);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfigProperties.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfigProperties.java
index 6619213..5b4cbf1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfigProperties.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/stream/StreamConfigProperties.java
@@ -89,14 +89,16 @@ public class StreamConfigProperties {
    */
   public static final String SEGMENT_FLUSH_DESIRED_SIZE = 
"realtime.segment.flush.desired.size";
 
+  /**
+   * The initial num rows to use for segment size auto tuning. By default 
100_000 is used.
+   */
+  public static final String SEGMENT_FLUSH_AUTOTUNE_INITIAL_ROWS = 
"realtime.segment.flush.autotune.initialRows";
+
   // Time threshold that controller will wait for the segment to be built by 
the server
   public static final String SEGMENT_COMMIT_TIMEOUT_SECONDS = 
"realtime.segment.commit.timeoutSeconds";
 
   /**
    * Helper method to create a stream specific property
-   * @param streamType
-   * @param property
-   * @return
    */
   public static String constructStreamProperty(String streamType, String 
property) {
     return Joiner.on(DOT_SEPARATOR).join(STREAM_PREFIX, streamType, property);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to