This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch pinot_conf_with_env_var in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4b44e3d853fa9aa4f1a65cbf9cd185bcf8c431d3 Author: Xiang Fu <[email protected]> AuthorDate: Mon Nov 16 19:25:17 2020 -0800 Adding config utils to apply environment variables and apply it to table config --- .../pinot/common/metadata/ZKMetadataProvider.java | 5 +- .../org/apache/pinot/spi/config/ConfigUtils.java | 72 +++++++++++ .../apache/pinot/spi/config/ConfigUtilsTest.java | 138 +++++++++++++++++++++ 3 files changed, 214 insertions(+), 1 deletion(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index 9fa56fa..7cce1eb 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -36,6 +36,8 @@ import org.apache.pinot.common.utils.SchemaUtils; import org.apache.pinot.common.utils.SegmentName; import org.apache.pinot.common.utils.StringUtil; import org.apache.pinot.common.utils.config.TableConfigUtils; +import org.apache.pinot.spi.config.BaseJsonConfig; +import org.apache.pinot.spi.config.ConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; @@ -219,7 +221,8 @@ public class ZKMetadataProvider { return null; } try { - return TableConfigUtils.fromZNRecord(znRecord); + TableConfig tableConfig = TableConfigUtils.fromZNRecord(znRecord); + return (TableConfig) ConfigUtils.applyConfigWithEnvVariables(tableConfig); } catch (Exception e) { LOGGER.error("Caught exception while getting table configuration for table: {}", tableNameWithType, e); return null; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/ConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/ConfigUtils.java new file mode 100644 index 0000000..29a9516 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/ConfigUtils.java @@ -0,0 +1,72 @@ +package org.apache.pinot.spi.config; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.JsonNodeType; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import org.apache.pinot.spi.utils.JsonUtils; + + +public class ConfigUtils { + private static final Map<String, String> ENVIRONMENT_VARIABLES = System.getenv(); + + /** + * Apply environment variables to any given BaseJsonConfig. + * + * @return Config with environment variable applied. + */ + public static <T extends BaseJsonConfig> T applyConfigWithEnvVariables(T config) { + JsonNode jsonNode; + try { + jsonNode = applyConfigWithEnvVariables(config.toJsonNode()); + } catch (RuntimeException e) { + throw new RuntimeException(String + .format("Unable to apply environment variables on json config class [%s].", config.getClass().getName()), e); + } + try { + return (T) JsonUtils.jsonNodeToObject(jsonNode, config.getClass()); + } catch (IOException e) { + throw new RuntimeException(String + .format("Unable to read JsonConfig to class [%s] after applying environment variables, jsonConfig is: '%s'.", + config.getClass().getName(), jsonNode.toString()), e); + } + } + + private static JsonNode applyConfigWithEnvVariables(JsonNode jsonNode) { + final JsonNodeType nodeType = jsonNode.getNodeType(); + switch (nodeType) { + case OBJECT: + if (jsonNode.size() > 0) { + Iterator<Map.Entry<String, JsonNode>> iterator = jsonNode.fields(); + while (iterator.hasNext()) { + final Map.Entry<String, JsonNode> next = iterator.next(); + next.setValue(applyConfigWithEnvVariables(next.getValue())); + } + } + break; + case ARRAY: + if (jsonNode.isArray()) { + ArrayNode arrayNode = (ArrayNode) jsonNode; + for (int i = 0; i < arrayNode.size(); i++) { + JsonNode arrayElement = arrayNode.get(i); + arrayNode.set(i, applyConfigWithEnvVariables(arrayElement)); + } + } + break; + case STRING: + final String field = jsonNode.asText(); + if (field.startsWith("${") && field.endsWith("}")) { + final String envVarKey = field.substring(2, field.length() - 1); + if (ENVIRONMENT_VARIABLES.containsKey(envVarKey)) { + return JsonNodeFactory.instance.textNode(ENVIRONMENT_VARIABLES.get(envVarKey)); + } + throw new RuntimeException("Missing environment Variable: " + field); + } + break; + } + return jsonNode; + } +} diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java new file mode 100644 index 0000000..37edd8b --- /dev/null +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/config/ConfigUtilsTest.java @@ -0,0 +1,138 @@ +package org.apache.pinot.spi.config; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.spi.utils.JsonUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class ConfigUtilsTest { + + @Test + public void testIndexing() + throws Exception { + IndexingConfig indexingConfig = new IndexingConfig(); + indexingConfig.setLoadMode("${LOAD_MODE}"); + indexingConfig.setAggregateMetrics(true); + List<String> invertedIndexColumns = Arrays.asList("a", "b", "c"); + indexingConfig.setInvertedIndexColumns(invertedIndexColumns); + List<String> sortedColumn = Arrays.asList("d", "e", "f"); + indexingConfig.setSortedColumn(sortedColumn); + List<String> onHeapDictionaryColumns = Arrays.asList("x", "y", "z"); + indexingConfig.setOnHeapDictionaryColumns(onHeapDictionaryColumns); + List<String> bloomFilterColumns = Arrays.asList("a", "b"); + indexingConfig.setBloomFilterColumns(bloomFilterColumns); + Map<String, String> noDictionaryConfig = new HashMap<>(); + noDictionaryConfig.put("a", "SNAPPY"); + noDictionaryConfig.put("b", "PASS_THROUGH"); + indexingConfig.setNoDictionaryConfig(noDictionaryConfig); + List<String> varLengthDictionaryColumns = Arrays.asList("a", "x", "z"); + indexingConfig.setVarLengthDictionaryColumns(varLengthDictionaryColumns); + + String streamType = "fakeStream"; + String topic = "fakeTopic"; + String consumerType = "simple"; + String tableName = "fakeTable_REALTIME"; + String consumerFactoryClass = "org.apache.pinot.plugin.stream.kafka20.StreamConsumerFactory"; + String decoderClass = "org.apache.pinot.plugin.inputformat.avro.KafkaAvroMessageDecoder"; + + Map<String, String> streamConfigMap = new HashMap<>(); + streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType); + streamConfigMap + .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), + topic); + streamConfigMap + .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES), + consumerType); + streamConfigMap.put(StreamConfigProperties + .constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), + consumerFactoryClass); + streamConfigMap + .put(StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS), + decoderClass); + streamConfigMap + .put(StreamConfigProperties.constructStreamProperty(streamType, "aws.accessKey"), "${AWS_ACCESS_KEY}"); + streamConfigMap + .put(StreamConfigProperties.constructStreamProperty(streamType, "aws.secretKey"), "${AWS_SECRET_KEY}"); + indexingConfig.setStreamConfigs(streamConfigMap); + + setEnv(ImmutableMap.of("LOAD_MODE", "MMAP", "AWS_ACCESS_KEY", "default_aws_access_key", "AWS_SECRET_KEY", + "default_aws_secret_key")); + + indexingConfig = ConfigUtils.applyConfigWithEnvVariables(indexingConfig); + assertEquals(indexingConfig.getLoadMode(), "MMAP"); + assertTrue(indexingConfig.isAggregateMetrics()); + assertEquals(indexingConfig.getInvertedIndexColumns(), invertedIndexColumns); + assertEquals(indexingConfig.getSortedColumn(), sortedColumn); + assertEquals(indexingConfig.getOnHeapDictionaryColumns(), onHeapDictionaryColumns); + assertEquals(indexingConfig.getBloomFilterColumns(), bloomFilterColumns); + assertEquals(indexingConfig.getNoDictionaryConfig(), noDictionaryConfig); + assertEquals(indexingConfig.getVarLengthDictionaryColumns(), varLengthDictionaryColumns); + + // Mandatory values + defaults + StreamConfig streamConfig = new StreamConfig(tableName, indexingConfig.getStreamConfigs()); + Assert.assertEquals(streamConfig.getType(), streamType); + Assert.assertEquals(streamConfig.getTopicName(), topic); + Assert.assertEquals(streamConfig.getConsumerTypes().get(0), StreamConfig.ConsumerType.LOWLEVEL); + Assert.assertEquals(streamConfig.getConsumerFactoryClassName(), consumerFactoryClass); + Assert.assertEquals(streamConfig.getDecoderClass(), decoderClass); + Assert.assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.accessKey"), + "default_aws_access_key"); + Assert.assertEquals(streamConfig.getStreamConfigsMap().get("stream.fakeStream.aws.secretKey"), + "default_aws_secret_key"); + Assert.assertEquals(streamConfig.getDecoderProperties().size(), 0); + Assert + .assertEquals(streamConfig.getOffsetCriteria(), new OffsetCriteria.OffsetCriteriaBuilder().withOffsetLargest()); + Assert + .assertEquals(streamConfig.getConnectionTimeoutMillis(), StreamConfig.DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS); + Assert.assertEquals(streamConfig.getFetchTimeoutMillis(), StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS); + Assert.assertEquals(streamConfig.getFlushThresholdRows(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS); + Assert.assertEquals(streamConfig.getFlushThresholdTimeMillis(), StreamConfig.DEFAULT_FLUSH_THRESHOLD_TIME_MILLIS); + Assert.assertEquals(streamConfig.getFlushThresholdSegmentSizeBytes(), + StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES); + } + + private static void setEnv(Map<String, String> newEnvVariablsMap) + throws Exception { + try { + Class<?> processEnvironmentClass = Class.forName("java.lang.ProcessEnvironment"); + Field theEnvironmentField = processEnvironmentClass.getDeclaredField("theEnvironment"); + theEnvironmentField.setAccessible(true); + Map<String, String> env = (Map<String, String>) theEnvironmentField.get(null); + env.putAll(newEnvVariablsMap); + Field theCaseInsensitiveEnvironmentField = + processEnvironmentClass.getDeclaredField("theCaseInsensitiveEnvironment"); + theCaseInsensitiveEnvironmentField.setAccessible(true); + Map<String, String> cienv = (Map<String, String>) theCaseInsensitiveEnvironmentField.get(null); + cienv.putAll(newEnvVariablsMap); + } catch (NoSuchFieldException e) { + Class[] classes = Collections.class.getDeclaredClasses(); + Map<String, String> env = System.getenv(); + for (Class cl : classes) { + if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) { + Field field = cl.getDeclaredField("m"); + field.setAccessible(true); + Object obj = field.get(env); + Map<String, String> map = (Map<String, String>) obj; + map.clear(); + map.putAll(newEnvVariablsMap); + } + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
