This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1bf5d02 RealtimeToOfflineSegments task generator (#6124)
1bf5d02 is described below
commit 1bf5d021db25fe09d00e31871b5572c149ea29e6
Author: Neha Pawar <[email protected]>
AuthorDate: Wed Oct 21 09:51:57 2020 -0700
RealtimeToOfflineSegments task generator (#6124)
Here's the final piece of the feature for Pinot managed offline flows.
This is the TaskGenerator which will create tasks of
realtimeToOfflineSegmentsTask type.
Typical usecase:
You have setup a realtime table. You want to make it a hybrid table. Using
this feature, you don't have to write your offline flows. Just set
realtimeToOfflineSegmentsTask in your table config. Bring up minions. The
minion tasks will push data to the offline table, 1 day at a time.
The window size, buffer, segment processing configs are configurable via
table task config.
---
.../pinot/common/metadata/ZKMetadataProvider.java | 5 +
.../common/minion/MinionTaskMetadataUtils.java | 81 ++++
.../RealtimeToOfflineSegmentsTaskMetadata.java | 88 ++++
.../RealtimeToOfflineSegmentsTaskMetadataTest.java | 46 +++
...rInfoProvider.java => ClusterInfoAccessor.java} | 40 +-
.../helix/core/minion/PinotTaskManager.java | 10 +-
.../generator/ConvertToRawIndexTaskGenerator.java | 14 +-
.../RealtimeToOfflineSegmentsTaskGenerator.java | 312 ++++++++++++++
.../minion/generator/TaskGeneratorRegistry.java | 7 +-
.../core/minion/generator/TaskGeneratorUtils.java | 52 ++-
...RealtimeToOfflineSegmentsTaskGeneratorTest.java | 452 +++++++++++++++++++++
.../apache/pinot/core/common/MinionConstants.java | 26 +-
.../processing/framework/SegmentMapper.java | 12 +-
.../processing/framework/SegmentMapperTest.java | 2 +-
...fflineSegmentsMinionClusterIntegrationTest.java | 216 ++++++++++
.../tests/SimpleMinionClusterIntegrationTest.java | 12 +-
.../org/apache/pinot/minion/MinionStarter.java | 4 +-
.../BaseMultipleSegmentsConversionExecutor.java | 15 +
.../pinot/minion/executor/BaseTaskExecutor.java | 3 +-
.../executor/MinionTaskZkMetadataManager.java | 57 +++
.../RealtimeToOfflineSegmentsTaskExecutor.java | 88 +++-
...altimeToOfflineSegmentsTaskExecutorFactory.java | 12 +-
.../executor/TaskExecutorFactoryRegistry.java | 4 +-
.../RealtimeToOfflineSegmentsTaskExecutorTest.java | 95 +++--
24 files changed, 1548 insertions(+), 105 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
index af5a5d6..9fa56fa 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java
@@ -59,6 +59,7 @@ public class ZKMetadataProvider {
private static final String PROPERTYSTORE_INSTANCE_CONFIGS_PREFIX =
"/CONFIGS/INSTANCE";
private static final String PROPERTYSTORE_CLUSTER_CONFIGS_PREFIX =
"/CONFIGS/CLUSTER";
private static final String PROPERTYSTORE_SEGMENT_LINEAGE =
"/SEGMENT_LINEAGE";
+ private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX =
"/MINION_TASK_METADATA";
public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord>
propertyStore, String realtimeTableName,
ZNRecord znRecord) {
@@ -116,6 +117,10 @@ public class ZKMetadataProvider {
return StringUtil.join("/", PROPERTYSTORE_SEGMENT_LINEAGE,
tableNameWithType);
}
+ public static String constructPropertyStorePathForMinionTaskMetadata(String
taskType, String tableNameWithType) {
+ return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX,
taskType, tableNameWithType);
+ }
+
public static boolean isSegmentExisted(ZkHelixPropertyStore<ZNRecord>
propertyStore, String resourceNameForResource,
String segmentName) {
return propertyStore
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
new file mode 100644
index 0000000..43ac82c
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionTaskMetadataUtils.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import javax.annotation.Nullable;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.store.HelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.zookeeper.data.Stat;
+
+
+/**
+ * Helper methods to fetch/persist ZNRecord for minion task metadata
+ */
+public final class MinionTaskMetadataUtils {
+
+ private MinionTaskMetadataUtils() {
+
+ }
+
+ /**
+ * Fetches the ZNRecord for the given minion task and tableName, from
MINION_TASK_METADATA/taskName/tableNameWthType
+ */
+ @Nullable
+ public static ZNRecord
fetchMinionTaskMetadataZNRecord(HelixPropertyStore<ZNRecord> propertyStore,
String taskType,
+ String tableNameWithType) {
+ String path =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType,
tableNameWithType);
+ Stat stat = new Stat();
+ ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
+ if (znRecord != null) {
+ znRecord.setVersion(stat.getVersion());
+ }
+ return znRecord;
+ }
+
+ /**
+ * Fetches the ZNRecord for RealtimeToOfflineSegmentsTask for given
tableNameWithType from
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWthType
+ * and converts it to a {@link RealtimeToOfflineSegmentsTaskMetadata} object
+ */
+ @Nullable
+ public static RealtimeToOfflineSegmentsTaskMetadata
getRealtimeToOfflineSegmentsTaskMetadata(
+ HelixPropertyStore<ZNRecord> propertyStore, String taskType, String
tableNameWithType) {
+ ZNRecord znRecord = fetchMinionTaskMetadataZNRecord(propertyStore,
taskType, tableNameWithType);
+ return znRecord != null ?
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord) : null;
+ }
+
+ /**
+ * Persists the provided {@link RealtimeToOfflineSegmentsTaskMetadata} to
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWthType.
+ * Will fail if expectedVersion does not match.
+ * Set expectedVersion -1 to override version check.
+ */
+ public static void
persistRealtimeToOfflineSegmentsTaskMetadata(HelixPropertyStore<ZNRecord>
propertyStore,
+ String taskType, RealtimeToOfflineSegmentsTaskMetadata
realtimeToOfflineSegmentsTaskMetadata,
+ int expectedVersion) {
+ String path =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType,
+ realtimeToOfflineSegmentsTaskMetadata.getTableNameWithType());
+ if (!propertyStore
+ .set(path, realtimeToOfflineSegmentsTaskMetadata.toZNRecord(),
expectedVersion, AccessOption.PERSISTENT)) {
+ throw new ZkException(
+ "Failed to persist minion RealtimeToOfflineSegmentsTask metadata: "
+ realtimeToOfflineSegmentsTaskMetadata);
+ }
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
new file mode 100644
index 0000000..2bd9c4c
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/minion/RealtimeToOfflineSegmentsTaskMetadata.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Metadata for the minion task of type
<code>RealtimeToOfflineSegmentsTask</code>.
+ * The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks
have been executed.
+ *
+ * This gets serialized and stored in zookeeper under the path
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWithType
+ *
+ * PinotTaskGenerator:
+ * The <code>watermarkMs</code>> is used by the
<code>RealtimeToOfflineSegmentsTaskGenerator</code>,
+ * to determine the window of execution for the task it is generating.
+ * The window of execution will be [watermarkMs, watermarkMs + bucketSize)
+ *
+ * PinotTaskExecutor:
+ * The same watermark is used by the
<code>RealtimeToOfflineSegmentsTaskExecutor</code>, to:
+ * - Verify that is is running the latest task scheduled by the task generator
+ * - Update the watermark as the end of the window that it executed for
+ */
+public class RealtimeToOfflineSegmentsTaskMetadata {
+
+ private static final String WATERMARK_KEY = "watermarkMs";
+
+ private final String _tableNameWithType;
+ private final long _watermarkMs;
+
+ public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long
watermarkMs) {
+ _tableNameWithType = tableNameWithType;
+ _watermarkMs = watermarkMs;
+ }
+
+ public String getTableNameWithType() {
+ return _tableNameWithType;
+ }
+
+ /**
+ * Get the watermark in millis
+ */
+ public long getWatermarkMs() {
+ return _watermarkMs;
+ }
+
+ public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord
znRecord) {
+ long watermark = znRecord.getLongField(WATERMARK_KEY, 0);
+ return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(),
watermark);
+ }
+
+ public ZNRecord toZNRecord() {
+ ZNRecord znRecord = new ZNRecord(_tableNameWithType);
+ znRecord.setLongField(WATERMARK_KEY, _watermarkMs);
+ return znRecord;
+ }
+
+ public String toJsonString() {
+ try {
+ return JsonUtils.objectToString(this);
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return toJsonString();
+ }
+}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
new file mode 100644
index 0000000..e5a4db2
--- /dev/null
+++
b/pinot-common/src/test/java/org/apache/pinot/common/metadata/RealtimeToOfflineSegmentsTaskMetadataTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metadata;
+
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Tests for converting to and from ZNRecord to {@link
RealtimeToOfflineSegmentsTaskMetadata}
+ */
+public class RealtimeToOfflineSegmentsTaskMetadataTest {
+
+ @Test
+ public void testToFromZNRecord() {
+ RealtimeToOfflineSegmentsTaskMetadata metadata =
+ new RealtimeToOfflineSegmentsTaskMetadata("testTable_REALTIME", 1000);
+ ZNRecord znRecord = metadata.toZNRecord();
+ assertEquals(znRecord.getId(), "testTable_REALTIME");
+ assertEquals(znRecord.getSimpleField("watermarkMs"), "1000");
+
+ RealtimeToOfflineSegmentsTaskMetadata
realtimeToOfflineSegmentsTaskMetadata =
+ RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(znRecord);
+ assertEquals(realtimeToOfflineSegmentsTaskMetadata.getTableNameWithType(),
"testTable_REALTIME");
+ assertEquals(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(), 1000);
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
similarity index 70%
rename from
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java
rename to
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
index 678d10d..8d3db71 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoProvider.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java
@@ -23,11 +23,15 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
+import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -37,12 +41,12 @@ import org.apache.pinot.spi.data.Schema;
* The class <code>ClusterInfoProvider</code> is an abstraction on top of
{@link PinotHelixResourceManager} and
* {@link PinotHelixTaskResourceManager} which provides cluster information
for {@link PinotTaskGenerator}.
*/
-public class ClusterInfoProvider {
+public class ClusterInfoAccessor {
private final PinotHelixResourceManager _pinotHelixResourceManager;
private final PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
private final ControllerConf _controllerConf;
- public ClusterInfoProvider(PinotHelixResourceManager
pinotHelixResourceManager,
+ public ClusterInfoAccessor(PinotHelixResourceManager
pinotHelixResourceManager,
PinotHelixTaskResourceManager pinotHelixTaskResourceManager,
ControllerConf controllerConf) {
_pinotHelixResourceManager = pinotHelixResourceManager;
_pinotHelixTaskResourceManager = pinotHelixTaskResourceManager;
@@ -94,6 +98,38 @@ public class ClusterInfoProvider {
}
/**
+ * Get all segment metadata for the given lowlevel REALTIME table name.
+ *
+ * @param tableName Table name with or without REALTIME type suffix
+ * @return List of segment metadata
+ */
+ public List<LLCRealtimeSegmentZKMetadata>
getLLCRealtimeSegmentsMetadata(String tableName) {
+ return ZKMetadataProvider
+
.getLLCRealtimeSegmentZKMetadataListForTable(_pinotHelixResourceManager.getPropertyStore(),
tableName);
+ }
+
+ /**
+ * Fetches the {@link RealtimeToOfflineSegmentsTaskMetadata} from
MINION_TASK_METADATA for given realtime table
+ * @param tableNameWithType realtime table name
+ */
+ public RealtimeToOfflineSegmentsTaskMetadata
getMinionRealtimeToOfflineSegmentsTaskMetadata(
+ String tableNameWithType) {
+ return MinionTaskMetadataUtils
+
.getRealtimeToOfflineSegmentsTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
+ MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
tableNameWithType);
+ }
+
+ /**
+ * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into
MINION_TASK_METADATA
+ * This call will override any previous metadata node
+ */
+ public void setRealtimeToOfflineSegmentsTaskMetadata(
+ RealtimeToOfflineSegmentsTaskMetadata
realtimeToOfflineSegmentsTaskMetadata) {
+
MinionTaskMetadataUtils.persistRealtimeToOfflineSegmentsTaskMetadata(_pinotHelixResourceManager.getPropertyStore(),
+ MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
realtimeToOfflineSegmentsTaskMetadata, -1);
+ }
+
+ /**
* Get all tasks' state for the given task type.
*
* @param taskType Task type
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
index 93422f7..7a17718 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotTaskManager.java
@@ -48,7 +48,7 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotTaskManager.class);
private final PinotHelixTaskResourceManager _helixTaskResourceManager;
- private final ClusterInfoProvider _clusterInfoProvider;
+ private final ClusterInfoAccessor _clusterInfoAccessor;
private final TaskGeneratorRegistry _taskGeneratorRegistry;
public PinotTaskManager(PinotHelixTaskResourceManager
helixTaskResourceManager,
@@ -58,8 +58,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
controllerConf.getPinotTaskManagerInitialDelaySeconds(),
helixResourceManager, leadControllerManager,
controllerMetrics);
_helixTaskResourceManager = helixTaskResourceManager;
- _clusterInfoProvider = new ClusterInfoProvider(helixResourceManager,
helixTaskResourceManager, controllerConf);
- _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoProvider);
+ _clusterInfoAccessor = new ClusterInfoAccessor(helixResourceManager,
helixTaskResourceManager, controllerConf);
+ _taskGeneratorRegistry = new TaskGeneratorRegistry(_clusterInfoAccessor);
}
/**
@@ -69,8 +69,8 @@ public class PinotTaskManager extends
ControllerPeriodicTask<Void> {
*
* @return Cluster info provider
*/
- public ClusterInfoProvider getClusterInfoProvider() {
- return _clusterInfoProvider;
+ public ClusterInfoAccessor getClusterInfoAccessor() {
+ return _clusterInfoAccessor;
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
index dc4acf7..437ac93 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/ConvertToRawIndexTaskGenerator.java
@@ -26,7 +26,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.pinot.common.data.Segment;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -39,10 +39,10 @@ import org.slf4j.LoggerFactory;
public class ConvertToRawIndexTaskGenerator implements PinotTaskGenerator {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConvertToRawIndexTaskGenerator.class);
- private final ClusterInfoProvider _clusterInfoProvider;
+ private final ClusterInfoAccessor _clusterInfoAccessor;
- public ConvertToRawIndexTaskGenerator(ClusterInfoProvider
clusterInfoProvider) {
- _clusterInfoProvider = clusterInfoProvider;
+ public ConvertToRawIndexTaskGenerator(ClusterInfoAccessor
clusterInfoAccessor) {
+ _clusterInfoAccessor = clusterInfoAccessor;
}
@Override
@@ -56,7 +56,7 @@ public class ConvertToRawIndexTaskGenerator implements
PinotTaskGenerator {
// Get the segments that are being converted so that we don't submit them
again
Set<Segment> runningSegments =
-
TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
_clusterInfoProvider);
+
TaskGeneratorUtils.getRunningSegments(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
_clusterInfoAccessor);
for (TableConfig tableConfig : tableConfigs) {
// Only generate tasks for OFFLINE tables
@@ -90,7 +90,7 @@ public class ConvertToRawIndexTaskGenerator implements
PinotTaskGenerator {
// Generate tasks
int tableNumTasks = 0;
- for (OfflineSegmentZKMetadata offlineSegmentZKMetadata :
_clusterInfoProvider
+ for (OfflineSegmentZKMetadata offlineSegmentZKMetadata :
_clusterInfoAccessor
.getOfflineSegmentsMetadata(offlineTableName)) {
// Generate up to tableMaxNumTasks tasks each time for each table
if (tableNumTasks == tableMaxNumTasks) {
@@ -111,7 +111,7 @@ public class ConvertToRawIndexTaskGenerator implements
PinotTaskGenerator {
configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
configs.put(MinionConstants.SEGMENT_NAME_KEY, segmentName);
configs.put(MinionConstants.DOWNLOAD_URL_KEY,
offlineSegmentZKMetadata.getDownloadUrl());
- configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoProvider.getVipUrl() + "/segments");
+ configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoAccessor.getVipUrl() + "/segments");
configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY,
String.valueOf(offlineSegmentZKMetadata.getCrc()));
if (columnsToConvertConfig != null) {
configs.put(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY,
columnsToConvertConfig);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
new file mode 100644
index 0000000..b505d28
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A {@link PinotTaskGenerator} implementation for generating tasks of type
{@link RealtimeToOfflineSegmentsTask}
+ *
+ * These will be generated only for REALTIME tables.
+ * At any given time, only 1 task of this type should be generated for a table.
+ *
+ * Steps:
+ * - The watermarkMs is read from the {@link
RealtimeToOfflineSegmentsTaskMetadata} ZNode
+ * found at
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWithType
+ * In case of cold-start, no ZNode will exist.
+ * A new ZNode will be created, with watermarkMs as the smallest time found
in the COMPLETED segments
+ *
+ * - The execution window for the task is calculated as,
+ * windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs,
+ * where bucketTime can be provided in the taskConfigs (default 1d)
+ *
+ * - If the execution window is not older than bufferTimeMs, no task will be
generated,
+ * where bufferTime can be provided in the taskConfigs (default 2d)
+ *
+ * - Segment metadata is scanned for all COMPLETED segments,
+ * to pick those containing data in window [windowStartMs, windowEndMs)
+ *
+ * - There are some special considerations for using last completed segment
of a partition.
+ * Such segments will be checked for segment endTime, to ensure there's no
overflow into CONSUMING segments
+ *
+ * - A PinotTaskConfig is created, with segment information, execution
window, and any config specific to the task
+ */
+public class RealtimeToOfflineSegmentsTaskGenerator implements
PinotTaskGenerator {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskGenerator.class);
+
+ private static final String DEFAULT_BUCKET_PERIOD = "1d";
+ private static final String DEFAULT_BUFFER_PERIOD = "2d";
+
+ private final ClusterInfoAccessor _clusterInfoAccessor;
+
+ public RealtimeToOfflineSegmentsTaskGenerator(ClusterInfoAccessor
clusterInfoAccessor) {
+ _clusterInfoAccessor = clusterInfoAccessor;
+ }
+
+ @Override
+ public String getTaskType() {
+ return RealtimeToOfflineSegmentsTask.TASK_TYPE;
+ }
+
+ @Override
+ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
+ String taskType = RealtimeToOfflineSegmentsTask.TASK_TYPE;
+ List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+ for (TableConfig tableConfig : tableConfigs) {
+ String realtimeTableName = tableConfig.getTableName();
+
+ if (tableConfig.getTableType() != TableType.REALTIME) {
+ LOGGER.warn("Skip generating task: {} for non-REALTIME table: {}",
taskType, realtimeTableName);
+ continue;
+ }
+ if (new StreamConfig(realtimeTableName,
tableConfig.getIndexingConfig().getStreamConfigs())
+ .hasHighLevelConsumerType()) {
+ LOGGER.warn("Skip generating task: {} for HLC REALTIME table: {}",
taskType, realtimeTableName);
+ continue;
+ }
+ LOGGER.info("Start generating task configs for table: {} for task: {}",
realtimeTableName, taskType);
+
+ // Only schedule 1 task of this type, per table
+ Map<String, TaskState> incompleteTasks =
+ TaskGeneratorUtils.getIncompleteTasks(taskType, realtimeTableName,
_clusterInfoAccessor);
+ if (!incompleteTasks.isEmpty()) {
+ LOGGER
+ .warn("Found incomplete tasks: {} for same table: {}. Skipping
task generation.", incompleteTasks.keySet(),
+ realtimeTableName);
+ continue;
+ }
+
+ // Get all segment metadata for completed segments (DONE status).
+ List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata = new
ArrayList<>();
+ Map<Integer, String> partitionToLatestCompletedSegmentName = new
HashMap<>();
+ Set<Integer> allPartitions = new HashSet<>();
+ getCompletedSegmentsInfo(realtimeTableName, completedSegmentsMetadata,
partitionToLatestCompletedSegmentName,
+ allPartitions);
+ if (completedSegmentsMetadata.isEmpty()) {
+ LOGGER
+ .info("No realtime-completed segments found for table: {},
skipping task generation: {}", realtimeTableName,
+ taskType);
+ continue;
+ }
+ allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
+ if (!allPartitions.isEmpty()) {
+ LOGGER
+ .info("Partitions: {} have no completed segments. Table: {} is not
ready for {}. Skipping task generation.",
+ allPartitions, realtimeTableName, taskType);
+ continue;
+ }
+
+ TableTaskConfig tableTaskConfig = tableConfig.getTaskConfig();
+ Preconditions.checkState(tableTaskConfig != null);
+ Map<String, String> taskConfigs =
tableTaskConfig.getConfigsForTaskType(taskType);
+ Preconditions.checkState(taskConfigs != null, "Task config shouldn't be
null for table: {}", realtimeTableName);
+
+ // Get the bucket size and buffer
+ String bucketTimePeriod =
+
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY,
DEFAULT_BUCKET_PERIOD);
+ String bufferTimePeriod =
+
taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY,
DEFAULT_BUFFER_PERIOD);
+ long bucketMs = TimeUtils.convertPeriodToMillis(bucketTimePeriod);
+ long bufferMs = TimeUtils.convertPeriodToMillis(bufferTimePeriod);
+
+ // Get watermark from RealtimeToOfflineSegmentsTaskMetadata ZNode.
WindowStart = watermark. WindowEnd = windowStart + bucket.
+ long windowStartMs = getWatermarkMs(realtimeTableName,
completedSegmentsMetadata, bucketMs);
+ long windowEndMs = windowStartMs + bucketMs;
+
+ // Check that execution window is older than bufferTime
+ if (windowEndMs > System.currentTimeMillis() - bufferMs) {
+ LOGGER.info(
+ "Window with start: {} and end: {} is not older than buffer time:
{} configured as {} ago. Skipping task generation: {}",
+ windowStartMs, windowEndMs, bufferMs, bufferTimePeriod, taskType);
+ continue;
+ }
+
+ // Find all COMPLETED segments with data overlapping execution window:
windowStart (inclusive) to windowEnd (exclusive)
+ List<String> segmentNames = new ArrayList<>();
+ List<String> downloadURLs = new ArrayList<>();
+ Set<String> lastCompletedSegmentPerPartition = new
HashSet<>(partitionToLatestCompletedSegmentName.values());
+ boolean skipGenerate = false;
+ for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata :
completedSegmentsMetadata) {
+ String segmentName = realtimeSegmentZKMetadata.getSegmentName();
+ TimeUnit timeUnit = realtimeSegmentZKMetadata.getTimeUnit();
+ long segmentStartTimeMs =
timeUnit.toMillis(realtimeSegmentZKMetadata.getStartTime());
+ long segmentEndTimeMs =
timeUnit.toMillis(realtimeSegmentZKMetadata.getEndTime());
+
+ // Check overlap with window
+ if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs <
windowEndMs) {
+ // If last completed segment is being used, make sure that segment
crosses over end of window.
+ // In the absence of this check, CONSUMING segments could contain
some portion of the window. That data would be skipped forever.
+ if (lastCompletedSegmentPerPartition.contains(segmentName) &&
segmentEndTimeMs < windowEndMs) {
+ LOGGER.info(
+ "Window data overflows into CONSUMING segments for partition
of segment: {}. Skipping task generation: {}",
+ segmentName, taskType);
+ skipGenerate = true;
+ break;
+ }
+ segmentNames.add(segmentName);
+ downloadURLs.add(realtimeSegmentZKMetadata.getDownloadUrl());
+ }
+ }
+
+ if (segmentNames.isEmpty() || skipGenerate) {
+ LOGGER.info("Found no eligible segments for task: {} with window [{} -
{}). Skipping task generation", taskType,
+ windowStartMs, windowEndMs);
+ continue;
+ }
+
+ Map<String, String> configs = new HashMap<>();
+ configs.put(MinionConstants.TABLE_NAME_KEY, realtimeTableName);
+ configs.put(MinionConstants.SEGMENT_NAME_KEY,
StringUtils.join(segmentNames, ","));
+ configs.put(MinionConstants.DOWNLOAD_URL_KEY,
StringUtils.join(downloadURLs, MinionConstants.URL_SEPARATOR));
+ configs.put(MinionConstants.UPLOAD_URL_KEY,
_clusterInfoAccessor.getVipUrl() + "/segments");
+
+ // Execution window
+ configs.put(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY,
String.valueOf(windowStartMs));
+ configs.put(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY,
String.valueOf(windowEndMs));
+
+ // Segment processor configs
+ String timeColumnTransformationConfig =
+
taskConfigs.get(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY);
+ if (timeColumnTransformationConfig != null) {
+
configs.put(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY,
timeColumnTransformationConfig);
+ }
+ String collectorTypeConfig =
taskConfigs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
+ if (collectorTypeConfig != null) {
+ configs.put(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY,
collectorTypeConfig);
+ }
+ for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
+ if
(entry.getKey().endsWith(RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX))
{
+ configs.put(entry.getKey(), entry.getValue());
+ }
+ }
+ String maxNumRecordsPerSegmentConfig =
+
taskConfigs.get(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY);
+ if (maxNumRecordsPerSegmentConfig != null) {
+
configs.put(RealtimeToOfflineSegmentsTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
maxNumRecordsPerSegmentConfig);
+ }
+
+ pinotTaskConfigs.add(new PinotTaskConfig(taskType, configs));
+ LOGGER.info("Finished generating task configs for table: {} for task:
{}", realtimeTableName, taskType);
+ }
+ return pinotTaskConfigs;
+ }
+
+ /**
+ * Fetch completed (non-consuming) segment and partition information
+ * @param realtimeTableName the realtime table name
+ * @param completedSegmentsMetadataList list for collecting the completed
segments metadata
+ * @param partitionToLatestCompletedSegmentName map for collecting the
partitionId to the latest completed segment name
+ * @param allPartitions set for collecting all partition ids
+ */
+ private void getCompletedSegmentsInfo(String realtimeTableName,
+ List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadataList,
+ Map<Integer, String> partitionToLatestCompletedSegmentName, Set<Integer>
allPartitions) {
+ List<LLCRealtimeSegmentZKMetadata> realtimeSegmentsMetadataList =
+ _clusterInfoAccessor.getLLCRealtimeSegmentsMetadata(realtimeTableName);
+
+ Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
+ for (LLCRealtimeSegmentZKMetadata metadata : realtimeSegmentsMetadataList)
{
+ LLCSegmentName llcSegmentName = new
LLCSegmentName(metadata.getSegmentName());
+ allPartitions.add(llcSegmentName.getPartitionId());
+
+ if (metadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
+ completedSegmentsMetadataList.add(metadata);
+ latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionId(),
(partitionId, latestLLCSegmentName) -> {
+ if (latestLLCSegmentName == null) {
+ return llcSegmentName;
+ } else {
+ if (llcSegmentName.getSequenceNumber() >
latestLLCSegmentName.getSequenceNumber()) {
+ return llcSegmentName;
+ } else {
+ return latestLLCSegmentName;
+ }
+ }
+ });
+ }
+ }
+
+ for (Map.Entry<Integer, LLCSegmentName> entry :
latestLLCSegmentNameMap.entrySet()) {
+ partitionToLatestCompletedSegmentName.put(entry.getKey(),
entry.getValue().getSegmentName());
+ }
+ }
+
+ /**
+ * Get the watermark from the RealtimeToOfflineSegmentsMetadata ZNode.
+ * If the znode is null, computes the watermark using either the start time
config or the start time from segment metadata
+ */
+ private long getWatermarkMs(String realtimeTableName,
List<LLCRealtimeSegmentZKMetadata> completedSegmentsMetadata,
+ long bucketMs) {
+ RealtimeToOfflineSegmentsTaskMetadata
realtimeToOfflineSegmentsTaskMetadata =
+
_clusterInfoAccessor.getMinionRealtimeToOfflineSegmentsTaskMetadata(realtimeTableName);
+
+ if (realtimeToOfflineSegmentsTaskMetadata == null) {
+ // No ZNode exists. Cold-start.
+ long watermarkMs;
+
+ // Find the smallest time from all segments
+ RealtimeSegmentZKMetadata minSegmentZkMetadata = null;
+ for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata :
completedSegmentsMetadata) {
+ if (minSegmentZkMetadata == null ||
realtimeSegmentZKMetadata.getStartTime() < minSegmentZkMetadata
+ .getStartTime()) {
+ minSegmentZkMetadata = realtimeSegmentZKMetadata;
+ }
+ }
+ Preconditions.checkState(minSegmentZkMetadata != null);
+
+ // Convert the segment minTime to millis
+ long minSegmentStartTimeMs =
minSegmentZkMetadata.getTimeUnit().toMillis(minSegmentZkMetadata.getStartTime());
+
+ // Round off according to the bucket. This ensures we align the offline
segments to proper time boundaries
+ // For example, if start time millis is 20200813T12:34:59, we want to
create the first segment for window [20200813, 20200814)
+ watermarkMs = (minSegmentStartTimeMs / bucketMs) * bucketMs;
+
+ // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark
calculated above
+ realtimeToOfflineSegmentsTaskMetadata = new
RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
+
_clusterInfoAccessor.setRealtimeToOfflineSegmentsTaskMetadata(realtimeToOfflineSegmentsTaskMetadata);
+ }
+ return realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs();
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index ff8d37e..f112d8b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
@@ -33,8 +33,9 @@ import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManag
public class TaskGeneratorRegistry {
private final Map<String, PinotTaskGenerator> _taskGeneratorRegistry = new
HashMap<>();
- public TaskGeneratorRegistry(@Nonnull ClusterInfoProvider
clusterInfoProvider) {
- registerTaskGenerator(new
ConvertToRawIndexTaskGenerator(clusterInfoProvider));
+ public TaskGeneratorRegistry(@Nonnull ClusterInfoAccessor
clusterInfoAccessor) {
+ registerTaskGenerator(new
ConvertToRawIndexTaskGenerator(clusterInfoAccessor));
+ registerTaskGenerator(new
RealtimeToOfflineSegmentsTaskGenerator(clusterInfoAccessor));
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
index 31f0c70..e4878a9 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorUtils.java
@@ -18,13 +18,14 @@
*/
package org.apache.pinot.controller.helix.core.minion.generator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.data.Segment;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
@@ -39,13 +40,13 @@ public class TaskGeneratorUtils {
* NOTE: we consider tasks not finished in one day as stuck and don't count
the segments in them
*
* @param taskType Task type
- * @param clusterInfoProvider Cluster info provider
+ * @param clusterInfoAccessor Cluster info accessor
* @return Set of running segments
*/
public static Set<Segment> getRunningSegments(@Nonnull String taskType,
- @Nonnull ClusterInfoProvider clusterInfoProvider) {
+ @Nonnull ClusterInfoAccessor clusterInfoAccessor) {
Set<Segment> runningSegments = new HashSet<>();
- Map<String, TaskState> taskStates =
clusterInfoProvider.getTaskStates(taskType);
+ Map<String, TaskState> taskStates =
clusterInfoAccessor.getTaskStates(taskType);
for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
// Skip COMPLETED tasks
if (entry.getValue() == TaskState.COMPLETED) {
@@ -54,13 +55,11 @@ public class TaskGeneratorUtils {
// Skip tasks scheduled for more than one day
String taskName = entry.getKey();
- long scheduleTimeMs = Long.parseLong(
-
taskName.substring(taskName.lastIndexOf(PinotHelixTaskResourceManager.TASK_NAME_SEPARATOR)
+ 1));
- if (System.currentTimeMillis() - scheduleTimeMs > ONE_DAY_IN_MILLIS) {
+ if (isTaskOlderThanOneDay(taskName)) {
continue;
}
- for (PinotTaskConfig pinotTaskConfig :
clusterInfoProvider.getTaskConfigs(entry.getKey())) {
+ for (PinotTaskConfig pinotTaskConfig :
clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
Map<String, String> configs = pinotTaskConfig.getConfigs();
runningSegments.add(
new Segment(configs.get(MinionConstants.TABLE_NAME_KEY),
configs.get(MinionConstants.SEGMENT_NAME_KEY)));
@@ -68,4 +67,41 @@ public class TaskGeneratorUtils {
}
return runningSegments;
}
+
+ /**
+ * Gets all the tasks for the provided task type and tableName, which do not
have TaskState COMPLETED
+ * @return map containing task name to task state for non-completed tasks
+ *
+ * NOTE: we consider tasks not finished in one day as stuck and don't count
them
+ */
+ public static Map<String, TaskState> getIncompleteTasks(String taskType,
String tableNameWithType,
+ ClusterInfoAccessor clusterInfoAccessor) {
+
+ Map<String, TaskState> nonCompletedTasks = new HashMap<>();
+ Map<String, TaskState> taskStates =
clusterInfoAccessor.getTaskStates(taskType);
+ for (Map.Entry<String, TaskState> entry : taskStates.entrySet()) {
+ if (entry.getValue() == TaskState.COMPLETED) {
+ continue;
+ }
+ String taskName = entry.getKey();
+ if (isTaskOlderThanOneDay(taskName)) {
+ continue;
+ }
+ for (PinotTaskConfig pinotTaskConfig :
clusterInfoAccessor.getTaskConfigs(entry.getKey())) {
+ if
(tableNameWithType.equals(pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY)))
{
+ nonCompletedTasks.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ return nonCompletedTasks;
+ }
+
+ /**
+ * Returns true if task's schedule time is older than 1d
+ */
+ private static boolean isTaskOlderThanOneDay(String taskName) {
+ long scheduleTimeMs =
+
Long.parseLong(taskName.substring(taskName.lastIndexOf(PinotHelixTaskResourceManager.TASK_NAME_SEPARATOR)
+ 1));
+ return System.currentTimeMillis() - scheduleTimeMs > ONE_DAY_IN_MILLIS;
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
new file mode 100644
index 0000000..5aa3377
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -0,0 +1,452 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.minion.generator;
+
+import com.google.common.collect.Lists;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
+import org.apache.pinot.core.common.MinionConstants;
+import
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+import org.apache.pinot.core.minion.PinotTaskConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Tests for {@link RealtimeToOfflineSegmentsTaskGenerator}
+ */
+public class RealtimeToOfflineSegmentsTaskGeneratorTest {
+
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String REALTIME_TABLE_NAME = "testTable_REALTIME";
+ private static final String TIME_COLUMN_NAME = "millisSinceEpoch";
+ private final Map<String, String> streamConfigs = new HashMap<>();
+
+ @BeforeClass
+ public void setup() {
+ streamConfigs.put(StreamConfigProperties.STREAM_TYPE, "kafka");
+ streamConfigs
+ .put(StreamConfigProperties.constructStreamProperty("kafka",
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ StreamConfig.ConsumerType.LOWLEVEL.toString());
+ streamConfigs.put(StreamConfigProperties.constructStreamProperty("kafka",
StreamConfigProperties.STREAM_TOPIC_NAME),
+ "myTopic");
+ streamConfigs
+ .put(StreamConfigProperties.constructStreamProperty("kafka",
StreamConfigProperties.STREAM_DECODER_CLASS),
+ "org.foo.Decoder");
+ }
+
+ private TableConfig getRealtimeTableConfig(Map<String, Map<String, String>>
taskConfigsMap) {
+ return new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
+ .setStreamConfigs(streamConfigs).setTaskConfig(new
TableTaskConfig(taskConfigsMap)).build();
+ }
+
+ /**
+ * Tests for some config checks
+ */
+ @Test
+ public void testGenerateTasksCheckConfigs() {
+ ClusterInfoAccessor mockClusterInfoProvide =
mock(ClusterInfoAccessor.class);
+
+
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
HashMap<>());
+ LLCRealtimeSegmentZKMetadata metadata1 =
+ getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE,
5000, 50_000, TimeUnit.MILLISECONDS, null);
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata1));
+
+ RealtimeToOfflineSegmentsTaskGenerator generator =
+ new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+ // Skip task generation, if offline table
+ TableConfig offlineTableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ List<PinotTaskConfig> pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+ assertTrue(pinotTaskConfigs.isEmpty());
+
+ // No tableTaskConfig, error
+ TableConfig realtimeTableConfig = getRealtimeTableConfig(new HashMap<>());
+ realtimeTableConfig.setTaskConfig(null);
+ try {
+ generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ Assert.fail("Should have failed for null tableTaskConfig");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+
+ // No taskConfig for task, error
+ realtimeTableConfig = getRealtimeTableConfig(new HashMap<>());
+ try {
+ generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ Assert.fail("Should have failed for null taskConfig");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+ }
+
+ /**
+ * Tests for some constraints on simultaneous tasks scheduled
+ */
+ @Test
+ public void testGenerateTasksSimultaneousConstraints() {
+ Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+ taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new
HashMap<>());
+ TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+ ClusterInfoAccessor mockClusterInfoProvide =
mock(ClusterInfoAccessor.class);
+ Map<String, TaskState> taskStatesMap = new HashMap<>();
+ String taskName = "Task_RealtimeToOfflineSegmentsTask_" +
System.currentTimeMillis();
+ Map<String, String> taskConfigs = new HashMap<>();
+ taskConfigs.put(MinionConstants.TABLE_NAME_KEY, REALTIME_TABLE_NAME);
+
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(taskStatesMap);
+ when(mockClusterInfoProvide.getTaskConfigs(taskName))
+ .thenReturn(Lists.newArrayList(new
PinotTaskConfig(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs)));
+
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(new
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100_000L));
+ LLCRealtimeSegmentZKMetadata metadata1 =
+ getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE,
80_000_000, 90_000_000,
+ TimeUnit.MILLISECONDS, null);
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata1));
+
+ RealtimeToOfflineSegmentsTaskGenerator generator =
+ new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+ // if same task and table, IN_PROGRESS, then don't generate again
+ taskStatesMap.put(taskName, TaskState.IN_PROGRESS);
+ List<PinotTaskConfig> pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertTrue(pinotTaskConfigs.isEmpty());
+
+ // if same task and table, but COMPLETED, generate
+ taskStatesMap.put(taskName, TaskState.COMPLETED);
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 1);
+
+ // if same task and table, IN_PROGRESS, but older than 1 day, generate
+ String oldTaskName =
+ "Task_RealtimeToOfflineSegmentsTask_" + (System.currentTimeMillis() -
TimeUnit.DAYS.toMillis(3));
+ taskStatesMap.remove(taskName);
+ taskStatesMap.put(oldTaskName, TaskState.IN_PROGRESS);
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 1);
+ }
+
+ /**
+ * Tests for realtime table with no segments
+ */
+ @Test
+ public void testGenerateTasksNoSegments() {
+ Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+ taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new
HashMap<>());
+ TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+ // No segments in table
+ ClusterInfoAccessor mockClusterInfoProvide =
mock(ClusterInfoAccessor.class);
+
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
HashMap<>());
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME)).thenReturn(Lists.newArrayList());
+
+ RealtimeToOfflineSegmentsTaskGenerator generator =
+ new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ List<PinotTaskConfig> pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertTrue(pinotTaskConfigs.isEmpty());
+
+ // No COMPLETED segments in table
+ LLCRealtimeSegmentZKMetadata seg1 =
+ getRealtimeSegmentZKMetadata("testTable__0__0__12345",
Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(seg1));
+
+ generator = new
RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertTrue(pinotTaskConfigs.isEmpty());
+
+ // 2 partitions. No COMPLETED segments for partition 0
+ LLCRealtimeSegmentZKMetadata seg2 =
+ getRealtimeSegmentZKMetadata("testTable__1__0__12345", Status.DONE,
5000, 10000, TimeUnit.MILLISECONDS, null);
+ LLCRealtimeSegmentZKMetadata seg3 =
+ getRealtimeSegmentZKMetadata("testTable__1__1__13456",
Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(seg1, seg2, seg3));
+
+ generator = new
RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertTrue(pinotTaskConfigs.isEmpty());
+ }
+
+ /**
+ * Test cold start. No minion metadata exists. Watermark is calculated based
on config or existing segments
+ */
+ @Test
+ public void testGenerateTasksNoMinionMetadata() {
+ ClusterInfoAccessor mockClusterInfoProvide =
mock(ClusterInfoAccessor.class);
+
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
HashMap<>());
+
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME)).thenReturn(null);
+ LLCRealtimeSegmentZKMetadata seg1 =
+ getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE,
1590048000000L, 1590134400000L,
+ TimeUnit.MILLISECONDS, "download1"); // 21 May 2020 8am to 22 May
2020 8am UTC
+ LLCRealtimeSegmentZKMetadata seg2 =
+ getRealtimeSegmentZKMetadata("testTable__1__0__12345", Status.DONE,
1590048000000L, 1590134400000L,
+ TimeUnit.MILLISECONDS, "download2"); // 21 May 2020 8am to 22 May
2020 8am UTC
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(seg1, seg2));
+
+ // StartTime calculated using segment metadata
+ Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+ taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new
HashMap<>());
+ TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+ RealtimeToOfflineSegmentsTaskGenerator generator =
+ new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ List<PinotTaskConfig> pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 1);
+ assertEquals(pinotTaskConfigs.get(0).getTaskType(),
RealtimeToOfflineSegmentsTask.TASK_TYPE);
+ Map<String, String> configs = pinotTaskConfigs.get(0).getConfigs();
+ assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY),
REALTIME_TABLE_NAME);
+ assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY),
"testTable__0__0__12345,testTable__1__0__12345");
+ assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY),
"download1,download2");
+
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY),
"1590019200000"); // 21 May 2020 UTC
+ assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY),
"1590105600000"); // 22 May 2020 UTC
+
+ // Segment metadata in hoursSinceEpoch
+ seg1 = getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE,
441680L, 441703L, TimeUnit.HOURS,
+ "download1"); // 21 May 2020 8am to 22 May 2020 8am UTC
+ seg2 = getRealtimeSegmentZKMetadata("testTable__1__0__12345", Status.DONE,
441680L, 441703L, TimeUnit.HOURS,
+ "download2"); // 21 May 2020 8am to 22 May 2020 8am UTC
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(seg1, seg2));
+ generator = new
RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 1);
+ assertEquals(pinotTaskConfigs.get(0).getTaskType(),
RealtimeToOfflineSegmentsTask.TASK_TYPE);
+ configs = pinotTaskConfigs.get(0).getConfigs();
+ assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY),
REALTIME_TABLE_NAME);
+ assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY),
"testTable__0__0__12345,testTable__1__0__12345");
+ assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY),
"download1,download2");
+
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY),
"1590019200000"); // 21 May 2020 UTC
+ assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY),
"1590105600000"); // 22 May 2020 UTC
+ }
+
+ /**
+ * Tests for subsequent runs after cold start
+ */
+ @Test
+ public void testGenerateTasksWithMinionMetadata() {
+ ClusterInfoAccessor mockClusterInfoProvide =
mock(ClusterInfoAccessor.class);
+
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
HashMap<>());
+
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(new
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L)); //
21 May 2020 UTC
+ LLCRealtimeSegmentZKMetadata seg1 =
+ getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE,
1589972400000L, 1590048000000L,
+ TimeUnit.MILLISECONDS, "download1"); // 05-20-2020T11:00:00 to
05-21-2020T08:00:00 UTC
+ LLCRealtimeSegmentZKMetadata seg2 =
+ getRealtimeSegmentZKMetadata("testTable__0__1__12345", Status.DONE,
1590048000000L, 1590134400000L,
+ TimeUnit.MILLISECONDS, "download2"); // 05-21-2020T08:00:00 UTC to
05-22-2020T08:00:00 UTC
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(seg1, seg2));
+
+ // Default configs
+ Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+ taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new
HashMap<>());
+ TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+ RealtimeToOfflineSegmentsTaskGenerator generator =
+ new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ List<PinotTaskConfig> pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 1);
+ assertEquals(pinotTaskConfigs.get(0).getTaskType(),
RealtimeToOfflineSegmentsTask.TASK_TYPE);
+ Map<String, String> configs = pinotTaskConfigs.get(0).getConfigs();
+ assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY),
REALTIME_TABLE_NAME);
+ assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY),
"testTable__0__0__12345,testTable__0__1__12345");
+ assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY),
"download1,download2");
+
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY),
"1590019200000"); // 5-21-2020
+ assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY),
"1590105600000"); // 5-22-2020
+
+ // No segments match
+
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(new
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590490800000L)); //
26 May 2020 UTC
+ generator = new
RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 0);
+
+ // Some segments match
+
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(new
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 1590019200000L)); //
21 May 2020 UTC
+ taskConfigsMap = new HashMap<>();
+ Map<String, String> taskConfigs = new HashMap<>();
+ taskConfigs.put(RealtimeToOfflineSegmentsTask.BUCKET_TIME_PERIOD_KEY,
"2h");
+ taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs);
+ realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 1);
+ assertEquals(pinotTaskConfigs.get(0).getTaskType(),
RealtimeToOfflineSegmentsTask.TASK_TYPE);
+ configs = pinotTaskConfigs.get(0).getConfigs();
+ assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY),
REALTIME_TABLE_NAME);
+ assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY),
"testTable__0__0__12345");
+ assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY), "download1");
+
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY),
+ "1590019200000"); // 05-21-2020T00:00:00
+ assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY),
"1590026400000"); // 05-21-2020T02:00:00
+
+ // Segment Processor configs
+ taskConfigsMap = new HashMap<>();
+ taskConfigs = new HashMap<>();
+
taskConfigs.put(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY,
"foo");
+ taskConfigs.put(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY,
"rollup");
+ taskConfigs.put("m1" +
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX, "MAX");
+ taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs);
+ realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+ generator = new
RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 1);
+ assertEquals(pinotTaskConfigs.get(0).getTaskType(),
RealtimeToOfflineSegmentsTask.TASK_TYPE);
+ configs = pinotTaskConfigs.get(0).getConfigs();
+ assertEquals(configs.get(MinionConstants.TABLE_NAME_KEY),
REALTIME_TABLE_NAME);
+ assertEquals(configs.get(MinionConstants.SEGMENT_NAME_KEY),
"testTable__0__0__12345,testTable__0__1__12345");
+ assertEquals(configs.get(MinionConstants.DOWNLOAD_URL_KEY),
"download1,download2");
+
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY),
+ "1590019200000"); // 05-21-2020T00:00:00
+ assertEquals(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY),
"1590105600000"); // 05-22-2020T00:00:00
+
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY),
"foo");
+
assertEquals(configs.get(RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY),
"rollup");
+ assertEquals(configs.get("m1" +
RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX), "MAX");
+ }
+
+ /**
+ * Tests for skipping task generation due to CONSUMING segments overlap with
window
+ */
+ @Test
+ public void testOverflowIntoConsuming() {
+ Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+ taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new
HashMap<>());
+ TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+ ClusterInfoAccessor mockClusterInfoProvide =
mock(ClusterInfoAccessor.class);
+
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
HashMap<>());
+
+
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(new
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, 100_000L));
+ LLCRealtimeSegmentZKMetadata metadata1 =
+ getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE,
50_000, 150_000, TimeUnit.MILLISECONDS,
+ null);
+ LLCRealtimeSegmentZKMetadata metadata2 =
+ getRealtimeSegmentZKMetadata("testTable__0__1__12345",
Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata1, metadata2));
+
+ RealtimeToOfflineSegmentsTaskGenerator generator =
+ new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+ // last COMPLETED segment's endTime is less than windowEnd time. CONSUMING
segment overlap. Skip task
+ List<PinotTaskConfig> pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertTrue(pinotTaskConfigs.isEmpty());
+
+ metadata1 =
+ getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE,
100_000, 200_000, TimeUnit.MILLISECONDS,
+ null);
+ metadata2 =
+ getRealtimeSegmentZKMetadata("testTable__0__1__12345",
Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata1, metadata2));
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertTrue(pinotTaskConfigs.isEmpty());
+
+ // last completed segment endtime ends at window end, allow
+ metadata1 =
+ getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE,
200_000, 86_500_000, TimeUnit.MILLISECONDS,
+ null);
+ metadata2 =
+ getRealtimeSegmentZKMetadata("testTable__0__1__12345",
Status.IN_PROGRESS, -1, -1, TimeUnit.MILLISECONDS, null);
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata1, metadata2));
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertEquals(pinotTaskConfigs.size(), 1);
+ }
+
+ @Test
+ public void testBuffer() {
+ Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+ taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, new
HashMap<>());
+ TableConfig realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+ // default buffer - 2d
+ long now = System.currentTimeMillis();
+ long watermarkMs = now - TimeUnit.DAYS.toMillis(1);
+ ClusterInfoAccessor mockClusterInfoProvide =
mock(ClusterInfoAccessor.class);
+
when(mockClusterInfoProvide.getTaskStates(RealtimeToOfflineSegmentsTask.TASK_TYPE)).thenReturn(new
HashMap<>());
+
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(new
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, watermarkMs));
+ LLCRealtimeSegmentZKMetadata metadata1 =
+ getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE,
watermarkMs - 100, watermarkMs + 100,
+ TimeUnit.MILLISECONDS, null);
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata1));
+
+ RealtimeToOfflineSegmentsTaskGenerator generator =
+ new RealtimeToOfflineSegmentsTaskGenerator(mockClusterInfoProvide);
+
+ List<PinotTaskConfig> pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertTrue(pinotTaskConfigs.isEmpty());
+
+ // custom buffer
+ Map<String, String> taskConfigs = new HashMap<>();
+ taskConfigs.put(RealtimeToOfflineSegmentsTask.BUFFER_TIME_PERIOD_KEY,
"15d");
+ taskConfigsMap.put(RealtimeToOfflineSegmentsTask.TASK_TYPE, taskConfigs);
+ realtimeTableConfig = getRealtimeTableConfig(taskConfigsMap);
+
+ watermarkMs = now - TimeUnit.DAYS.toMillis(10);
+
when(mockClusterInfoProvide.getMinionRealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(new
RealtimeToOfflineSegmentsTaskMetadata(REALTIME_TABLE_NAME, watermarkMs));
+ metadata1 =
+ getRealtimeSegmentZKMetadata("testTable__0__0__12345", Status.DONE,
watermarkMs - 100, watermarkMs + 100,
+ TimeUnit.MILLISECONDS, null);
+
when(mockClusterInfoProvide.getLLCRealtimeSegmentsMetadata(REALTIME_TABLE_NAME))
+ .thenReturn(Lists.newArrayList(metadata1));
+
+ pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(realtimeTableConfig));
+ assertTrue(pinotTaskConfigs.isEmpty());
+ }
+
+ private LLCRealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String
segmentName, Status status, long startTime,
+ long endTime, TimeUnit timeUnit, String downloadURL) {
+ LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata = new
LLCRealtimeSegmentZKMetadata();
+ realtimeSegmentZKMetadata.setSegmentName(segmentName);
+ realtimeSegmentZKMetadata.setStatus(status);
+ realtimeSegmentZKMetadata.setStartTime(startTime);
+ realtimeSegmentZKMetadata.setEndTime(endTime);
+ realtimeSegmentZKMetadata.setTimeUnit(timeUnit);
+ realtimeSegmentZKMetadata.setDownloadUrl(downloadURL);
+ return realtimeSegmentZKMetadata;
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index f2049ea..cd98833 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -62,16 +62,30 @@ public class MinionConstants {
public static final String MERGED_SEGMENT_NAME_KEY =
"mergedSegmentNameKey";
}
+ /**
+ * Creates segments for the OFFLINE table, using completed segments from the
corresponding REALTIME table
+ */
public static class RealtimeToOfflineSegmentsTask {
- public static final String TASK_TYPE = "realtimeToOfflineSegmentsTask";
- // window
- public static final String WINDOW_START_MILLIS_KEY = "windowStartMillis";
- public static final String WINDOW_END_MILLIS_KEY = "windowEndMillis";
- // segment processing
+ public static final String TASK_TYPE = "RealtimeToOfflineSegmentsTask";
+
+ /**
+ * The time window size for the task.
+ * e.g. if set to "1d", then task is scheduled to run for a 1 day window
+ */
+ public static final String BUCKET_TIME_PERIOD_KEY = "bucketTimePeriod";
+ /**
+ * The time period to wait before picking segments for this task
+ * e.g. if set to "2d", no task will be scheduled for a time window
younger than 2 days
+ */
+ public static final String BUFFER_TIME_PERIOD_KEY = "bufferTimePeriod";
+
+ // Window start and window end set by task generator
+ public static final String WINDOW_START_MS_KEY = "windowStartMs";
+ public static final String WINDOW_END_MS_KEY = "windowEndMs";
+ // Segment processing related configs
public static final String TIME_COLUMN_TRANSFORM_FUNCTION_KEY =
"timeColumnTransformFunction";
public static final String COLLECTOR_TYPE_KEY = "collectorType";
public static final String AGGREGATION_TYPE_KEY_SUFFIX =
".aggregationType";
public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY =
"maxNumRecordsPerSegment";
-
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
index a947d29..a09f3b5 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentMapper.java
@@ -50,9 +50,9 @@ import org.slf4j.LoggerFactory;
* Mapper phase of the SegmentProcessorFramework.
* Reads the input segment and creates partitioned avro data files
* Performs:
- * - record transformations
+ * - record filtering
+ * - column transformations
* - partitioning
- * - partition filtering
*/
public class SegmentMapper {
@@ -74,8 +74,8 @@ public class SegmentMapper {
_mapperId = mapperId;
_avroSchema =
SegmentProcessorUtils.convertPinotSchemaToAvroSchema(mapperConfig.getPinotSchema());
- _recordTransformer =
RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
_recordFilter =
RecordFilterFactory.getRecordFilter(mapperConfig.getRecordFilterConfig());
+ _recordTransformer =
RecordTransformerFactory.getRecordTransformer(mapperConfig.getRecordTransformerConfig());
for (PartitionerConfig partitionerConfig :
mapperConfig.getPartitionerConfigs()) {
_partitioners.add(PartitionerFactory.getPartitioner(partitionerConfig));
}
@@ -101,14 +101,14 @@ public class SegmentMapper {
while (segmentRecordReader.hasNext()) {
reusableRow = segmentRecordReader.next(reusableRow);
- // Record transformation
- reusableRow = _recordTransformer.transformRecord(reusableRow);
-
// Record filtering
if (_recordFilter.filter(reusableRow)) {
continue;
}
+ // Record transformation
+ reusableRow = _recordTransformer.transformRecord(reusableRow);
+
// Partitioning
int p = 0;
for (Partitioner partitioner : _partitioners) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index 1856b9d..88857e8 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -275,7 +275,7 @@ public class SegmentMapperTest {
SegmentMapperConfig config11 = new SegmentMapperConfig(_pinotSchema,
new
RecordTransformerConfig.Builder().setTransformFunctionsMap(transformFunctionMap).build(),
new
RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
- .setFilterFunction("Groovy({timeValue != 1597795200000},
timeValue)").build(), Lists.newArrayList(
+ .setFilterFunction("Groovy({timeValue < 1597795200000L|| timeValue
>= 1597881600000}, timeValue)").build(), Lists.newArrayList(
new
PartitionerConfig.Builder().setPartitionerType(PartitionerFactory.PartitionerType.COLUMN_VALUE)
.setColumnName("timeValue").build()));
Map<String, List<Object[]>> expectedRecords11 =
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
new file mode 100644
index 0000000..3f80e95
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.helix.task.TaskState;
+import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
+import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
+import org.apache.pinot.core.common.MinionConstants;
+import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Integration test for minion task of type "RealtimeToOfflineSegmentsTask"
+ * With every task run, a new segment is created in the offline table for 1
day. Watermark also keeps progressing accordingly.
+ */
+public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends
RealtimeClusterIntegrationTest {
+
+ private PinotHelixTaskResourceManager _helixTaskResourceManager;
+ private PinotTaskManager _taskManager;
+ private PinotHelixResourceManager _pinotHelixResourceManager;
+
+ private long _dataSmallestTimeMillis;
+ private long _dateSmallestDays;
+ private String _realtimeTableName;
+ private String _offlineTableName;
+
+ @Override
+ protected TableTaskConfig getTaskConfig() {
+ return new TableTaskConfig(
+
Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
new HashMap<>()));
+ }
+
+ @Override
+ protected boolean useLlc() {
+ return true;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ // Setup realtime table, and blank offline table
+ super.setUp();
+ addTableConfig(createOfflineTableConfig());
+ startMinion(null, null);
+
+ _helixTaskResourceManager =
_controllerStarter.getHelixTaskResourceManager();
+ _taskManager = _controllerStarter.getTaskManager();
+ _pinotHelixResourceManager = _controllerStarter.getHelixResourceManager();
+
+ _realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+ _offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
+
+ List<RealtimeSegmentZKMetadata> realtimeSegmentMetadata =
+
_pinotHelixResourceManager.getRealtimeSegmentMetadata(_realtimeTableName);
+ long minSegmentTime = Long.MAX_VALUE;
+ for (RealtimeSegmentZKMetadata metadata : realtimeSegmentMetadata) {
+ if (metadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.DONE) {
+ if (metadata.getStartTime() < minSegmentTime) {
+ minSegmentTime = metadata.getStartTime();
+ }
+ }
+ }
+ _dataSmallestTimeMillis = minSegmentTime;
+ _dateSmallestDays = minSegmentTime / 86400000;
+ }
+
+ @Test
+ public void testRealtimeToOfflineSegmentsTask() {
+
+ List<OfflineSegmentZKMetadata> offlineSegmentMetadata =
+
_pinotHelixResourceManager.getOfflineSegmentMetadata(_offlineTableName);
+ Assert.assertTrue(offlineSegmentMetadata.isEmpty());
+
+ long expectedWatermark = _dataSmallestTimeMillis;
+ int numOfflineSegments = 0;
+ long offlineSegmentTime = _dateSmallestDays;
+ for (int i = 0; i < 3; i++) {
+ // Schedule task
+ Assert.assertTrue(
+
_taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+ Assert.assertTrue(_helixTaskResourceManager.getTaskQueues().contains(
+
PinotHelixTaskResourceManager.getHelixJobQueueName(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE)));
+ // Should not generate more tasks
+ Assert.assertFalse(
+
_taskManager.scheduleTasks().containsKey(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE));
+
+ expectedWatermark = expectedWatermark + 86400000;
+ // Wait at most 600 seconds for all tasks COMPLETED
+ waitForTaskToComplete(expectedWatermark);
+ // check segment is in offline
+ offlineSegmentMetadata =
_pinotHelixResourceManager.getOfflineSegmentMetadata(_offlineTableName);
+ Assert.assertEquals(offlineSegmentMetadata.size(), ++numOfflineSegments);
+ Assert.assertEquals(offlineSegmentMetadata.get(i).getStartTime(),
offlineSegmentTime);
+ Assert.assertEquals(offlineSegmentMetadata.get(i).getEndTime(),
offlineSegmentTime);
+ offlineSegmentTime++;
+ }
+ testHardcodedSqlQueries();
+ }
+
+ private void waitForTaskToComplete(long expectedWatermark) {
+ TestUtils.waitForCondition(input -> {
+ // Check task state
+ for (TaskState taskState : _helixTaskResourceManager
+
.getTaskStates(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE).values())
{
+ if (taskState != TaskState.COMPLETED) {
+ return false;
+ }
+ }
+ return true;
+ }, 600_000L, "Failed to complete task");
+
+ // Check segment ZK metadata
+ RealtimeToOfflineSegmentsTaskMetadata minionTaskMetadata =
+
_taskManager.getClusterInfoAccessor().getMinionRealtimeToOfflineSegmentsTaskMetadata(_realtimeTableName);
+ Assert.assertNotNull(minionTaskMetadata);
+ Assert.assertEquals(minionTaskMetadata.getWatermarkMs(),
expectedWatermark);
+ }
+
+ @Test(enabled = false)
+ public void testSegmentListApi() {
+ }
+
+ @Test(enabled = false)
+ public void testBrokerDebugOutput() {
+ }
+
+ @Test(enabled = false)
+ public void testBrokerDebugRoutingTableSQL() {
+ }
+
+ @Test(enabled = false)
+ public void testBrokerResponseMetadata() {
+ }
+
+ @Test(enabled = false)
+ public void testDictionaryBasedQueries() {
+ }
+
+ @Test(enabled = false)
+ public void testGeneratedQueriesWithMultiValues() {
+ }
+
+ @Test(enabled = false)
+ public void testGeneratedQueriesWithoutMultiValues() {
+ }
+
+ @Test(enabled = false)
+ public void testHardcodedQueries() {
+ }
+
+ @Test(enabled = false)
+ public void testHardcodedSqlQueries() {
+ }
+
+ @Test(enabled = false)
+ public void testInstanceShutdown() {
+ }
+
+ @Test(enabled = false)
+ public void testQueriesFromQueryFile() {
+ }
+
+ @Test(enabled = false)
+ public void testQueryExceptions() {
+ }
+
+ @Test(enabled = false)
+ public void testReload(boolean includeOfflineTable) {
+ }
+
+ @Test(enabled = false)
+ public void testSqlQueriesFromQueryFile() {
+ }
+
+ @Test(enabled = false)
+ public void testVirtualColumnQueries() {
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws Exception {
+ stopMinion();
+
+ super.tearDown();
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
index a0be8ed..5232b7a 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SimpleMinionClusterIntegrationTest.java
@@ -27,7 +27,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.helix.task.TaskState;
-import org.apache.pinot.controller.helix.core.minion.ClusterInfoProvider;
+import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerator;
@@ -94,7 +94,7 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
_taskManager = _controllerStarter.getTaskManager();
// Register the test task generator into task manager
- _taskManager.registerTaskGenerator(new
TestTaskGenerator(_taskManager.getClusterInfoProvider()));
+ _taskManager.registerTaskGenerator(new
TestTaskGenerator(_taskManager.getClusterInfoAccessor()));
Map<String, PinotTaskExecutorFactory> taskExecutorFactoryRegistry =
Collections.singletonMap(TestTaskGenerator.TASK_TYPE, new
TestTaskExecutorFactory());
@@ -199,10 +199,10 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
private static class TestTaskGenerator implements PinotTaskGenerator {
public static final String TASK_TYPE = "TestTask";
- private final ClusterInfoProvider _clusterInfoProvider;
+ private final ClusterInfoAccessor _clusterInfoAccessor;
- public TestTaskGenerator(ClusterInfoProvider clusterInfoProvider) {
- _clusterInfoProvider = clusterInfoProvider;
+ public TestTaskGenerator(ClusterInfoAccessor clusterInfoAccessor) {
+ _clusterInfoAccessor = clusterInfoAccessor;
}
@Override
@@ -215,7 +215,7 @@ public class SimpleMinionClusterIntegrationTest extends
ClusterTest {
assertEquals(tableConfigs.size(), 2);
// Generate at most 2 tasks
- if (_clusterInfoProvider.getTaskStates(TASK_TYPE).size() >= 2) {
+ if (_clusterInfoAccessor.getTaskStates(TASK_TYPE).size() >= 2) {
return Collections.emptyList();
}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
index fd82dda..abb0788 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
@@ -42,6 +42,7 @@ import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.minion.events.EventObserverFactoryRegistry;
import org.apache.pinot.minion.events.MinionEventObserverFactory;
+import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
import org.apache.pinot.minion.executor.PinotTaskExecutorFactory;
import org.apache.pinot.minion.executor.TaskExecutorFactoryRegistry;
import org.apache.pinot.minion.metrics.MinionMeter;
@@ -81,7 +82,8 @@ public class MinionStarter implements ServiceStartable {
+ CommonConstants.Minion.DEFAULT_HELIX_PORT);
setupHelixSystemProperties();
_helixManager = new ZKHelixManager(helixClusterName, _instanceId,
InstanceType.PARTICIPANT, zkAddress);
- _taskExecutorFactoryRegistry = new TaskExecutorFactoryRegistry();
+ MinionTaskZkMetadataManager minionTaskZkMetadataManager = new
MinionTaskZkMetadataManager(_helixManager);
+ _taskExecutorFactoryRegistry = new
TaskExecutorFactoryRegistry(minionTaskZkMetadataManager);
_eventObserverFactoryRegistry = new EventObserverFactoryRegistry();
}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
index a17b10e..ee7be9e 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseMultipleSegmentsConversionExecutor.java
@@ -64,9 +64,23 @@ public abstract class BaseMultipleSegmentsConversionExecutor
extends BaseTaskExe
File workingDir)
throws Exception;
+ /**
+ * Pre processing operations to be done at the beginning of task execution
+ */
+ protected void preProcess(PinotTaskConfig pinotTaskConfig) {
+ }
+
+ /**
+ * Post processing operations to be done before exiting a successful task
execution
+ */
+ protected void postProcess(PinotTaskConfig pinotTaskConfig) {
+ }
+
@Override
public List<SegmentConversionResult> executeTask(PinotTaskConfig
pinotTaskConfig)
throws Exception {
+ preProcess(pinotTaskConfig);
+
String taskType = pinotTaskConfig.getTaskType();
Map<String, String> configs = pinotTaskConfig.getConfigs();
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
@@ -141,6 +155,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String outputSegmentNames =
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
.collect(Collectors.joining(","));
+ postProcess(pinotTaskConfig);
LOGGER
.info("Done executing {} on table: {}, input segments: {}, output
segments: {}", taskType, tableNameWithType,
inputSegmentNames, outputSegmentNames);
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
index 619acc7..6875032 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/BaseTaskExecutor.java
@@ -43,8 +43,7 @@ public abstract class BaseTaskExecutor implements
PinotTaskExecutor {
}
protected Schema getSchema(String tableName) {
- Schema schema =
-
ZKMetadataProvider.getTableSchema(MINION_CONTEXT.getHelixPropertyStore(),
tableName);
+ Schema schema =
ZKMetadataProvider.getTableSchema(MINION_CONTEXT.getHelixPropertyStore(),
tableName);
Preconditions.checkState(schema != null, "Failed to find schema for table:
%s", tableName);
return schema;
}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
new file mode 100644
index 0000000..29354b3
--- /dev/null
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/MinionTaskZkMetadataManager.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.minion.executor;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.minion.MinionTaskMetadataUtils;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
+import
org.apache.pinot.core.common.MinionConstants.RealtimeToOfflineSegmentsTask;
+
+
+/**
+ * An abstraction on top of {@link HelixManager}, created for the {@link
PinotTaskExecutor}, restricted to only get/update minion task metadata
+ */
+public class MinionTaskZkMetadataManager {
+ private final HelixManager _helixManager;
+
+ public MinionTaskZkMetadataManager(HelixManager helixManager) {
+ _helixManager = helixManager;
+ }
+
+ /**
+ * Fetch the ZNRecord under
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask for the given
tableNameWithType
+ */
+ public ZNRecord getRealtimeToOfflineSegmentsTaskZNRecord(String
tableNameWithType) {
+ return MinionTaskMetadataUtils
+
.fetchMinionTaskMetadataZNRecord(_helixManager.getHelixPropertyStore(),
RealtimeToOfflineSegmentsTask.TASK_TYPE,
+ tableNameWithType);
+ }
+
+ /**
+ * Sets the {@link RealtimeToOfflineSegmentsTaskMetadata} into the ZNode at
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask
+ * for the corresponding tableNameWithType
+ * @param expectedVersion Version expected to be updating, failing the call
if there's a mismatch
+ */
+ public void setRealtimeToOfflineSegmentsTaskMetadata(
+ RealtimeToOfflineSegmentsTaskMetadata
realtimeToOfflineSegmentsTaskMetadata, int expectedVersion) {
+
MinionTaskMetadataUtils.persistRealtimeToOfflineSegmentsTaskMetadata(_helixManager.getHelixPropertyStore(),
+ RealtimeToOfflineSegmentsTask.TASK_TYPE,
realtimeToOfflineSegmentsTaskMetadata, expectedVersion);
+ }
+}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
index 936f027..82cb2fe 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -28,6 +28,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+import org.apache.helix.ZNRecord;
+import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.core.segment.processing.collector.CollectorConfig;
@@ -46,25 +48,71 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A task to convert segments from a REALTIME table to segments for its
corresponding OFFLINE table.
- * The realtime segments could span across multiple time windows. This task
extracts data and creates segments for a configured time range.
+ * The realtime segments could span across multiple time windows.
+ * This task extracts data and creates segments for a configured time range.
* The {@link SegmentProcessorFramework} is used for the segment conversion,
which also does
- * 1. time column rollup
- * 2. time window extraction using filter function
+ * 1. time window extraction using filter function
+ * 2. time column rollup
* 3. partitioning using table config's segmentPartitioningConfig
* 4. aggregations and rollup
* 5. data sorting
+ *
+ * Before beginning the task, the <code>watermarkMs</code> is checked in the
minion task metadata ZNode,
+ * located at
MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/<tableNameWithType>
+ * It should match the <code>windowStartMs</code>.
+ * The version of the znode is cached.
+ *
+ * After the segments are uploaded, this task updates the
<code>watermarkMs</code> in the minion task metadata ZNode.
+ * The znode version is checked during update,
+ * and update only succeeds if version matches with the previously cached
version
*/
public class RealtimeToOfflineSegmentsTaskExecutor extends
BaseMultipleSegmentsConversionExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class);
private static final String INPUT_SEGMENTS_DIR = "input_segments";
private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
+ private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
+ private int _expectedVersion = Integer.MIN_VALUE;
+ private long _nextWatermark;
+
+ public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager
minionTaskZkMetadataManager) {
+ _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
+ }
+
+ /**
+ * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime
table.
+ * Checks that the <code>watermarkMs</code> from the ZNode matches the
windowStartMs in the task configs.
+ * If yes, caches the ZNode version to check during update.
+ */
+ @Override
+ public void preProcess(PinotTaskConfig pinotTaskConfig) {
+ Map<String, String> configs = pinotTaskConfig.getConfigs();
+ String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
+
+ ZNRecord realtimeToOfflineSegmentsTaskZNRecord =
+
_minionTaskZkMetadataManager.getRealtimeToOfflineSegmentsTaskZNRecord(realtimeTableName);
+ Preconditions.checkState(realtimeToOfflineSegmentsTaskZNRecord != null,
+ "RealtimeToOfflineSegmentsTaskMetadata ZNRecord for table: %s should
not be null. Exiting task.",
+ realtimeTableName);
+
+ RealtimeToOfflineSegmentsTaskMetadata
realtimeToOfflineSegmentsTaskMetadata =
+
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);
+ long windowStartMs =
Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
+
Preconditions.checkState(realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs()
== windowStartMs,
+ "watermarkMs in RealtimeToOfflineSegmentsTask metadata: %s does not
match windowStartMs: %d in task configs for table: %s. "
+ + "ZNode may have been modified by another task",
realtimeToOfflineSegmentsTaskMetadata, windowStartMs,
+ realtimeTableName);
+
+ _expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();
+ }
+
@Override
protected List<SegmentConversionResult> convert(PinotTaskConfig
pinotTaskConfig, List<File> originalIndexDirs,
File workingDir)
@@ -74,19 +122,22 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends
BaseMultipleSegmentsC
LOGGER.info("Starting task: {} with configs: {}", taskType, configs);
long startMillis = System.currentTimeMillis();
- String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); //
rawTableName_OFFLINE expected here
- TableConfig tableConfig = getTableConfig(tableNameWithType);
- Schema schema = getSchema(tableNameWithType);
+ String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
+ String rawTableName =
TableNameBuilder.extractRawTableName(realtimeTableName);
+ String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+ TableConfig tableConfig = getTableConfig(offlineTableName);
+ Schema schema = getSchema(offlineTableName);
Set<String> schemaColumns = schema.getPhysicalColumnNames();
String timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
DateTimeFieldSpec dateTimeFieldSpec =
schema.getSpecForTimeColumn(timeColumn);
Preconditions
.checkState(dateTimeFieldSpec != null, "No valid spec found for time
column: %s in schema for table: %s",
- timeColumn, tableNameWithType);
+ timeColumn, offlineTableName);
+
+ long windowStartMs =
Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY));
+ long windowEndMs =
Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
+ _nextWatermark = windowEndMs;
- long windowStartMs =
-
Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY));
- long windowEndMs =
Long.parseLong(configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY));
String timeColumnTransformFunction =
configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY);
String collectorTypeStr =
configs.get(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY);
@@ -120,7 +171,7 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends
BaseMultipleSegmentsC
if (tableConfig.getIndexingConfig().getSegmentPartitionConfig() != null) {
Map<String, ColumnPartitionConfig> columnPartitionMap =
tableConfig.getIndexingConfig().getSegmentPartitionConfig().getColumnPartitionMap();
- PartitionerConfig partitionerConfig =
getPartitionerConfig(columnPartitionMap, tableNameWithType, schemaColumns);
+ PartitionerConfig partitionerConfig =
getPartitionerConfig(columnPartitionMap, offlineTableName, schemaColumns);
segmentProcessorConfigBuilder.setPartitionerConfigs(Lists.newArrayList(partitionerConfig));
}
@@ -162,12 +213,25 @@ public class RealtimeToOfflineSegmentsTaskExecutor
extends BaseMultipleSegmentsC
for (File file : outputSegmentsDir.listFiles()) {
String outputSegmentName = file.getName();
results.add(new
SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
- .setTableNameWithType(tableNameWithType).build());
+ .setTableNameWithType(offlineTableName).build());
}
return results;
}
/**
+ * Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime
table.
+ * Checks that the version of the ZNode matches with the version cached
earlier. If yes, proceeds to update watermark in the ZNode
+ * TODO: Making the minion task update the ZK metadata is an anti-pattern,
however cannot see another way to do it
+ */
+ @Override
+ public void postProcess(PinotTaskConfig pinotTaskConfig) {
+ String realtimeTableName =
pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY);
+ RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
+ new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName,
_nextWatermark);
+
_minionTaskZkMetadataManager.setRealtimeToOfflineSegmentsTaskMetadata(newMinionMetadata,
_expectedVersion);
+ }
+
+ /**
* Construct a {@link RecordTransformerConfig} for time column transformation
*/
private RecordTransformerConfig getRecordTransformerConfigForTime(String
timeColumnTransformFunction,
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
index b2db61f..7eabbc4 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorFactory.java
@@ -18,9 +18,19 @@
*/
package org.apache.pinot.minion.executor;
+/**
+ * Factory for creating {@link RealtimeToOfflineSegmentsTaskExecutor} tasks
+ */
public class RealtimeToOfflineSegmentsTaskExecutorFactory implements
PinotTaskExecutorFactory {
+
+ private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
+
+ public
RealtimeToOfflineSegmentsTaskExecutorFactory(MinionTaskZkMetadataManager
minionTaskZkMetadataManager) {
+ _minionTaskZkMetadataManager = minionTaskZkMetadataManager;
+ }
+
@Override
public PinotTaskExecutor create() {
- return new RealtimeToOfflineSegmentsTaskExecutor();
+ return new
RealtimeToOfflineSegmentsTaskExecutor(_minionTaskZkMetadataManager);
}
}
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index bd28f79..1b783dc 100644
---
a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++
b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -31,13 +31,13 @@ import org.apache.pinot.core.common.MinionConstants;
public class TaskExecutorFactoryRegistry {
private final Map<String, PinotTaskExecutorFactory>
_taskExecutorFactoryRegistry = new HashMap<>();
- public TaskExecutorFactoryRegistry() {
+ public TaskExecutorFactoryRegistry(MinionTaskZkMetadataManager
minionTaskZkMetadataManager) {
registerTaskExecutorFactory(MinionConstants.ConvertToRawIndexTask.TASK_TYPE,
new ConvertToRawIndexTaskExecutorFactory());
registerTaskExecutorFactory(MinionConstants.PurgeTask.TASK_TYPE, new
PurgeTaskExecutorFactory());
registerTaskExecutorFactory(MinionConstants.MergeRollupTask.TASK_TYPE, new
MergeRollupTaskExecutorFactory());
registerTaskExecutorFactory(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
- new RealtimeToOfflineSegmentsTaskExecutorFactory());
+ new
RealtimeToOfflineSegmentsTaskExecutorFactory(minionTaskZkMetadataManager));
}
/**
diff --git
a/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java
b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java
index 601c5e4..341f543 100644
---
a/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++
b/pinot-minion/src/test/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -96,15 +96,16 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_WITH_PARTITIONING).setTimeColumnName(T)
.setSegmentPartitionConfig(new
SegmentPartitionConfig(columnPartitionConfigMap)).build();
TableConfig tableConfigWithSortedCol =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_WITH_SORTED_COL).setTimeColumnName(T).setSortedColumn(D1)
- .build();
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_WITH_SORTED_COL).setTimeColumnName(T)
+ .setSortedColumn(D1).build();
TableConfig tableConfigEpochHours =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX).setSortedColumn(D1)
- .setIngestionConfig(new IngestionConfig(null,
Lists.newArrayList(new TransformConfig(T_TRX, "toEpochHours(t)"))))
- .build();
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX)
+ .setSortedColumn(D1).setIngestionConfig(
+ new IngestionConfig(null, Lists.newArrayList(new
TransformConfig(T_TRX, "toEpochHours(t)")))).build();
TableConfig tableConfigSDF =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX).setSortedColumn(D1)
- .setIngestionConfig(new IngestionConfig(null,
Lists.newArrayList(new TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')"))))
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX)
+ .setSortedColumn(D1).setIngestionConfig(
+ new IngestionConfig(null, Lists.newArrayList(new
TransformConfig(T_TRX, "toDateTime(t, 'yyyyMMddHH')"))))
.build();
Schema schema =
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1,
FieldSpec.DataType.STRING)
@@ -112,12 +113,12 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
.addDateTime(T, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH",
"1:MILLISECONDS").build();
Schema schemaEpochHours =
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1,
FieldSpec.DataType.STRING)
- .addMetric(M1, FieldSpec.DataType.INT).addDateTime(T_TRX,
FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS")
- .build();
+ .addMetric(M1, FieldSpec.DataType.INT)
+ .addDateTime(T_TRX, FieldSpec.DataType.INT, "1:HOURS:EPOCH",
"1:HOURS").build();
Schema schemaSDF =
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1,
FieldSpec.DataType.STRING)
- .addMetric(M1, FieldSpec.DataType.INT).addDateTime(T_TRX,
FieldSpec.DataType.INT, "1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMddHH", "1:HOURS")
- .build();
+ .addMetric(M1, FieldSpec.DataType.INT)
+ .addDateTime(T_TRX, FieldSpec.DataType.INT,
"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMddHH", "1:HOURS").build();
List<String> d1 = Lists.newArrayList("foo", "bar", "foo", "foo", "bar");
List<List<GenericRow>> rows = new ArrayList<>(NUM_SEGMENTS);
@@ -213,12 +214,13 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor
realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor();
+ new RealtimeToOfflineSegmentsTaskExecutor(null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, "testTable_OFFLINE");
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
"1600473600000");
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
"1600560000000");
- PinotTaskConfig pinotTaskConfig = new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY,
"1600473600000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY,
"1600560000000");
+ PinotTaskConfig pinotTaskConfig =
+ new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
List<SegmentConversionResult> conversionResults =
realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig,
_segmentIndexDirList, WORKING_DIR);
@@ -239,13 +241,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor
realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor();
+ new RealtimeToOfflineSegmentsTaskExecutor(null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
"1600473600000");
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
"1600560000000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY,
"1600473600000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY,
"1600560000000");
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY,
"rollup");
- PinotTaskConfig pinotTaskConfig = new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
+ PinotTaskConfig pinotTaskConfig =
+ new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
List<SegmentConversionResult> conversionResults =
realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig,
_segmentIndexDirList, WORKING_DIR);
@@ -266,14 +269,15 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor
realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor();
+ new RealtimeToOfflineSegmentsTaskExecutor(null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
"1600473600000");
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
"1600560000000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY,
"1600473600000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY,
"1600560000000");
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY,
"rollup");
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY,
"round(t, 86400000)");
- PinotTaskConfig pinotTaskConfig = new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
+ PinotTaskConfig pinotTaskConfig =
+ new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
List<SegmentConversionResult> conversionResults =
realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig,
_segmentIndexDirList, WORKING_DIR);
@@ -294,15 +298,16 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor
realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor();
+ new RealtimeToOfflineSegmentsTaskExecutor(null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME);
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
"1600473600000");
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
"1600560000000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY,
"1600473600000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY,
"1600560000000");
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.TIME_COLUMN_TRANSFORM_FUNCTION_KEY,
"round(t, 86400000)");
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY,
"rollup");
configs.put(M1 +
MinionConstants.RealtimeToOfflineSegmentsTask.AGGREGATION_TYPE_KEY_SUFFIX,
"max");
- PinotTaskConfig pinotTaskConfig = new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
+ PinotTaskConfig pinotTaskConfig =
+ new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
List<SegmentConversionResult> conversionResults =
realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig,
_segmentIndexDirList, WORKING_DIR);
@@ -326,12 +331,13 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor
realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor();
+ new RealtimeToOfflineSegmentsTaskExecutor(null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_PARTITIONING);
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
"1600468000000");
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
"1600617600000");
- PinotTaskConfig pinotTaskConfig = new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY,
"1600468000000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY,
"1600617600000");
+ PinotTaskConfig pinotTaskConfig =
+ new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
List<SegmentConversionResult> conversionResults =
realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig,
_segmentIndexDirList, WORKING_DIR);
@@ -357,13 +363,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor
realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor();
+ new RealtimeToOfflineSegmentsTaskExecutor(null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_SORTED_COL);
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
"1600473600000");
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
"1600560000000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY,
"1600473600000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY,
"1600560000000");
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY,
"rollup");
- PinotTaskConfig pinotTaskConfig = new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
+ PinotTaskConfig pinotTaskConfig =
+ new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
List<SegmentConversionResult> conversionResults =
realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig,
_segmentIndexDirList, WORKING_DIR);
@@ -384,13 +391,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor
realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor();
+ new RealtimeToOfflineSegmentsTaskExecutor(null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_EPOCH_HOURS);
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
"1600473600000");
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
"1600560000000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY,
"1600473600000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY,
"1600560000000");
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY,
"rollup");
- PinotTaskConfig pinotTaskConfig = new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
+ PinotTaskConfig pinotTaskConfig =
+ new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
List<SegmentConversionResult> conversionResults =
realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig,
_segmentIndexDirListEpochHours, WORKING_DIR);
@@ -412,13 +420,14 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
FileUtils.deleteQuietly(WORKING_DIR);
RealtimeToOfflineSegmentsTaskExecutor
realtimeToOfflineSegmentsTaskExecutor =
- new RealtimeToOfflineSegmentsTaskExecutor();
+ new RealtimeToOfflineSegmentsTaskExecutor(null);
Map<String, String> configs = new HashMap<>();
configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_SDF);
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MILLIS_KEY,
"1600473600000");
-
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MILLIS_KEY,
"1600560000000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_START_MS_KEY,
"1600473600000");
+
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY,
"1600560000000");
configs.put(MinionConstants.RealtimeToOfflineSegmentsTask.COLLECTOR_TYPE_KEY,
"rollup");
- PinotTaskConfig pinotTaskConfig = new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
+ PinotTaskConfig pinotTaskConfig =
+ new
PinotTaskConfig(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
configs);
List<SegmentConversionResult> conversionResults =
realtimeToOfflineSegmentsTaskExecutor.convert(pinotTaskConfig,
_segmentIndexDirListSDF, WORKING_DIR);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]