This is an automated email from the ASF dual-hosted git repository. sunithabeeram pushed a commit to branch virtualColFix in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f6f9c47e1ef978e6d8bdaaf5adda80d7cba3f9ca Author: Sunitha Beeram <[email protected]> AuthorDate: Fri Dec 14 07:17:41 2018 -0800 Fix realtime segment reload --- .../com/linkedin/pinot/common/data/Schema.java | 15 ++++++++ .../helix/ControllerRequestURLBuilder.java | 5 +++ .../defaultcolumn/BaseDefaultColumnHandler.java | 2 +- .../pinot/integration/tests/ClusterTest.java | 15 ++++++-- .../tests/LLCRealtimeClusterIntegrationTest.java | 42 ++++++++++++++++++++++ .../tests/OfflineClusterIntegrationTest.java | 6 ++-- 6 files changed, 79 insertions(+), 6 deletions(-) diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java b/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java index 7a6cf7c..f2e6a0c 100644 --- a/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/data/Schema.java @@ -30,6 +30,7 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -268,6 +269,20 @@ public final class Schema { @JsonIgnore @Nonnull + public Set<String> getPhysicalColumnNames() { + Set<String> cols = new HashSet<>(); + cols.addAll(_fieldSpecMap.keySet()); + for (String col : _fieldSpecMap.keySet()) { + // exclude virtual columns + if(isVirtualColumn(col)) { + cols.remove(col); + } + } + return cols; + } + + @JsonIgnore + @Nonnull public Collection<FieldSpec> getAllFieldSpecs() { return _fieldSpecMap.values(); } diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java index 9a79cc3..e7dfa77 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/ControllerRequestURLBuilder.java @@ -154,6 +154,11 @@ public class ControllerRequestURLBuilder { return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, query); } + public String forTableReload(String tableName, String tableType) { + String query = "reload?type=" + tableType; + return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "segments", query); + } + public String forTableUpdateIndexingConfigs(String tableName) { return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "indexingConfigs"); } diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java index 5abed29..283af0f 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java @@ -150,7 +150,7 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler { Map<String, DefaultColumnAction> defaultColumnActionMap = new HashMap<>(); // Compute ADD and UPDATE actions. - Collection<String> columnsInSchema = _schema.getColumnNames(); + Collection<String> columnsInSchema = _schema.getPhysicalColumnNames(); for (String column : columnsInSchema) { FieldSpec fieldSpecInSchema = _schema.getFieldSpecFor(column); Preconditions.checkNotNull(fieldSpecInSchema); diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java index 7803874..8524196 100644 --- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/ClusterTest.java @@ -18,6 +18,7 @@ package com.linkedin.pinot.integration.tests; import com.linkedin.pinot.broker.broker.BrokerServerBuilder; import com.linkedin.pinot.broker.broker.BrokerTestUtils; import com.linkedin.pinot.broker.broker.helix.HelixBrokerStarter; +import com.linkedin.pinot.common.config.IndexingConfig; import com.linkedin.pinot.common.config.TableConfig; import com.linkedin.pinot.common.config.TableNameBuilder; import com.linkedin.pinot.common.config.TableTaskConfig; @@ -424,13 +425,23 @@ public abstract class ClusterTest extends ControllerTest { .setTaskConfig(taskConfig) .build(); + // save the realtime table config + _realtimeTableConfig = tableConfig; + if (!isUsingNewConfigFormat()) { sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJSONConfigString()); - } else { - _realtimeTableConfig = tableConfig; } } + protected void updateRealtimeTableConfig(String tablename, List<String> invertedIndexCols, List<String> bloomFilterCols) throws Exception { + + IndexingConfig config = _realtimeTableConfig.getIndexingConfig(); + config.setInvertedIndexColumns(invertedIndexCols); + config.setBloomFilterColumns(bloomFilterCols); + + sendPutRequest(_controllerRequestURLBuilder.forUpdateTableConfig(tablename), _realtimeTableConfig.toJSONConfigString()); + } + protected void dropRealtimeTable(String tableName) throws Exception { sendDeleteRequest( _controllerRequestURLBuilder.forTableDelete(TableNameBuilder.REALTIME.tableNameWithType(tableName))); diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index f962747..d50cc71 100644 --- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -15,15 +15,20 @@ */ package com.linkedin.pinot.integration.tests; +import com.google.common.base.Function; import com.linkedin.pinot.common.config.TableNameBuilder; import com.linkedin.pinot.common.utils.CommonConstants; +import com.linkedin.pinot.core.indexsegment.generator.SegmentVersion; +import com.linkedin.pinot.util.TestUtils; import java.io.File; +import java.util.Arrays; import java.util.List; import java.util.Random; import org.apache.avro.reflect.Nullable; import org.apache.commons.configuration.Configuration; import org.apache.commons.io.FileUtils; import org.apache.helix.ZNRecord; +import org.json.JSONObject; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -33,6 +38,7 @@ import org.testng.annotations.Test; * Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer. */ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest { + public static final String CONSUMER_DIRECTORY = "/tmp/consumer-test"; public static final long RANDOM_SEED = System.currentTimeMillis(); public static final Random RANDOM = new Random(RANDOM_SEED); @@ -40,6 +46,11 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio public final boolean _isDirectAlloc = RANDOM.nextBoolean(); public final boolean _isConsumerDirConfigured = RANDOM.nextBoolean(); + private static final String TEST_UPDATED_INVERTED_INDEX_QUERY = + "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305"; + private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = + Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime"); + @BeforeClass @Override public void setUp() throws Exception { @@ -83,6 +94,7 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio @Test public void testSegmentFlushSize() throws Exception { + String zkSegmentsPath = "/SEGMENTS/" + TableNameBuilder.REALTIME.tableNameWithType(getTableName()); List<String> segmentNames = _propertyStore.getChildNames(zkSegmentsPath, 0); for (String segmentName : segmentNames) { @@ -92,4 +104,34 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio "Segment: " + segmentName + " does not have the expected flush size"); } } + + @Test + public void testInvertedIndexTriggering() throws Exception { + + final long numTotalDocs = getCountStarResult(); + + JSONObject queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY); + Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs); + // TODO: investigate why assert for a specific value fails intermittently + Assert.assertNotSame(queryResponse.getLong("numEntriesScannedInFilter"), 0); + + updateRealtimeTableConfig(getTableName(), UPDATED_INVERTED_INDEX_COLUMNS, null); + + sendPostRequest(_controllerRequestURLBuilder.forTableReload(getTableName(), "realtime"), null); + + TestUtils.waitForCondition(new Function<Void, Boolean>() { + @Override + public Boolean apply(@javax.annotation.Nullable Void aVoid) { + try { + JSONObject queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY); + // Total docs should not change during reload + Assert.assertEquals(queryResponse.getLong("totalDocs"), numTotalDocs); + return queryResponse.getLong("numEntriesScannedInFilter") == 0; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }, 600_000L, "Failed to generate inverted index"); + } } + diff --git a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java index 4e5febf..ad8d86f 100644 --- a/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/com/linkedin/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -50,10 +50,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet private static final int NUM_SEGMENTS = 12; // For inverted index triggering test - private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = - Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime"); private static final String TEST_UPDATED_INVERTED_INDEX_QUERY = "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305"; + private static final List<String> UPDATED_INVERTED_INDEX_COLUMNS = + Arrays.asList("FlightNum", "Origin", "Quarter", "DivActualElapsedTime"); private static final List<String> UPDATED_BLOOM_FLITER_COLUMNS = Arrays.asList("Carrier"); private static final String TEST_UPDATED_BLOOM_FILTER_QUERY = "SELECT COUNT(*) FROM mytable WHERE Carrier = 'CA'"; @@ -200,7 +200,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet throw new RuntimeException(e); } } - }, 600_000L, "Failed to generate inverted index"); + }, 600_000L, "Failed to generate bloomfilter"); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
