This is an automated email from the ASF dual-hosted git repository. boaz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit edbfd64c4b7d8b7af406a507e959e5856185eaea Author: Anton Gozhiy <[email protected]> AuthorDate: Mon Apr 15 19:24:11 2019 +0300 DRILL-7030: Make format plugins fully pluggable - Bootstrap files for format plugins were introduced and added to the existing plugins in contrib. - Formats from these files are being added dynamically to the corresponding storage plugins. closes #1780 --- .../main/resources/bootstrap-format-plugins.json | 26 +++++++ .../main/resources/bootstrap-format-plugins.json | 20 +++++ .../main/resources/bootstrap-format-plugins.json | 26 +++++++ .../java/org/apache/drill/exec/ExecConstants.java | 1 + .../exec/store/StoragePluginRegistryImpl.java | 88 ++++++++++++++++++---- .../drill/exec/store/dfs/FileSystemConfig.java | 3 +- 6 files changed, 148 insertions(+), 16 deletions(-) diff --git a/contrib/format-ltsv/src/main/resources/bootstrap-format-plugins.json b/contrib/format-ltsv/src/main/resources/bootstrap-format-plugins.json new file mode 100644 index 0000000..3dda8cf --- /dev/null +++ b/contrib/format-ltsv/src/main/resources/bootstrap-format-plugins.json @@ -0,0 +1,26 @@ +{ + "storage":{ + "dfs": { + "type": "file", + "formats": { + "ltsv": { + "type": "ltsv", + "extensions": [ + "ltsv" + ] + } + } + }, + "s3": { + "type": "file", + "formats": { + "ltsv": { + "type": "ltsv", + "extensions": [ + "ltsv" + ] + } + } + } + } +} diff --git a/contrib/format-maprdb/src/main/resources/bootstrap-format-plugins.json b/contrib/format-maprdb/src/main/resources/bootstrap-format-plugins.json new file mode 100644 index 0000000..a126709 --- /dev/null +++ b/contrib/format-maprdb/src/main/resources/bootstrap-format-plugins.json @@ -0,0 +1,20 @@ +{ + "storage":{ + "dfs": { + "type": "file", + "formats": { + "maprdb": { + "type": "maprdb" + } + } + }, + "s3": { + "type": "file", + "formats": { + "maprdb": { + "type": "maprdb" + } + } + } + } +} diff --git a/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json b/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json new file mode 100644 index 0000000..ee5a396 --- /dev/null +++ b/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json @@ -0,0 +1,26 @@ +{ + "storage":{ + "dfs": { + "type": "file", + "formats": { + "syslog": { + "type": "syslog", + "extensions": [ + "syslog" + ] + } + } + }, + "s3": { + "type": "file", + "formats": { + "syslog": { + "type": "syslog", + "extensions": [ + "syslog" + ] + } + } + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 3503507..93c9902 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -726,6 +726,7 @@ public final class ExecConstants { new OptionDescription("Min width for text readers, mostly for testing.")); public static final String BOOTSTRAP_STORAGE_PLUGINS_FILE = "bootstrap-storage-plugins.json"; + public static final String BOOTSTRAP_FORMAT_PLUGINS_FILE = "bootstrap-format-plugins.json"; public static final String SKIP_RUNTIME_ROWGROUP_PRUNING_KEY = "exec.storage.skip_runtime_rowgroup_pruning"; public static final OptionValidator SKIP_RUNTIME_ROWGROUP_PRUNING = new BooleanValidator(SKIP_RUNTIME_ROWGROUP_PRUNING_KEY, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java index c5554f8..4718a20 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java @@ -36,6 +36,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.LogicalPlanPersistence; @@ -300,7 +301,8 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { } /** - * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE} files for the first fresh + * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE} + * and format plugins {@link ExecConstants#BOOTSTRAP_FORMAT_PLUGINS_FILE} files for the first fresh * instantiating of Drill * * @param lpPersistence deserialization mapper provider @@ -310,22 +312,19 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { private StoragePlugins loadBootstrapPlugins(LogicalPlanPersistence lpPersistence) throws IOException { // bootstrap load the config since no plugins are stored. logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration."); - Set<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false); - if (urls != null && !urls.isEmpty()) { - logger.info("Loading the storage plugin configs from URLs {}.", urls); + Set<URL> storageUrls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false); + Set<URL> formatUrls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_FORMAT_PLUGINS_FILE, false); + if (storageUrls != null && !storageUrls.isEmpty()) { + logger.info("Loading the storage plugin configs from URLs {}.", storageUrls); StoragePlugins bootstrapPlugins = new StoragePlugins(new HashMap<>()); Map<String, URL> pluginURLMap = new HashMap<>(); - for (URL url : urls) { - String pluginsData = Resources.toString(url, Charsets.UTF_8); - StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class); - for (Entry<String, StoragePluginConfig> plugin : plugins) { - StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue()); - if (oldPluginConfig != null) { - logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.", - plugin.getKey(), pluginURLMap.get(plugin.getKey()), url); - } else { - pluginURLMap.put(plugin.getKey(), url); - } + for (URL url : storageUrls) { + loadStoragePlugins(url, bootstrapPlugins, pluginURLMap, lpPersistence); + } + if (formatUrls != null && !formatUrls.isEmpty()) { + logger.info("Loading the format plugin configs from URLs {}.", formatUrls); + for (URL url : formatUrls) { + loadFormatPlugins(url, bootstrapPlugins, pluginURLMap, lpPersistence); } } return bootstrapPlugins; @@ -335,6 +334,65 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { } /** + * Loads storage plugins from the given URL + * + * @param url URL to the storage plugins bootstrap file + * @param bootstrapPlugins a collection where the plugins should be loaded to + * @param pluginURLMap a map to store correspondence between storage plugins and bootstrap files in which they are defined. Used for logging + * @param lpPersistence need to get an object mapper for the bootstrap files + * @throws IOException if failed to retrieve a plugin from a bootstrap file + */ + private void loadStoragePlugins(URL url, StoragePlugins bootstrapPlugins, Map<String, URL> pluginURLMap, LogicalPlanPersistence lpPersistence) throws IOException { + StoragePlugins plugins = getPluginsFromResource(url, lpPersistence); + plugins.forEach(plugin -> { + StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue()); + if (oldPluginConfig != null) { + logger.warn("Duplicate plugin instance '[{}]' defined in [{}, {}], ignoring the later one.", + plugin.getKey(), pluginURLMap.get(plugin.getKey()), url); + } else { + pluginURLMap.put(plugin.getKey(), url); + } + }); + } + + /** + * Loads format plugins from the given URL and adds the formats to the specified storage plugins + * + * @param url URL to the format plugins bootstrap file + * @param bootstrapPlugins a collection with loaded storage plugins. New formats will be added to them + * @param pluginURLMap a map to store correspondence between storage plugins and bootstrap files in which they are defined. Used for logging + * @param lpPersistence need to get an object mapper for the bootstrap files + * @throws IOException if failed to retrieve a plugin from a bootstrap file + */ + private void loadFormatPlugins(URL url, StoragePlugins bootstrapPlugins, Map<String, URL> pluginURLMap, LogicalPlanPersistence lpPersistence) throws IOException { + StoragePlugins plugins = getPluginsFromResource(url, lpPersistence); + plugins.forEach(formatPlugin -> { + String targetStoragePluginName = formatPlugin.getKey(); + StoragePluginConfig storagePlugin = bootstrapPlugins.getConfig(targetStoragePluginName); + StoragePluginConfig formatPluginValue = formatPlugin.getValue(); + if (storagePlugin == null) { + logger.warn("No storage plugins with the given name are registered: '[{}]'", targetStoragePluginName); + } else if (storagePlugin instanceof FileSystemConfig && formatPluginValue instanceof FileSystemConfig) { + FileSystemConfig targetPlugin = (FileSystemConfig) storagePlugin; + ((FileSystemConfig) formatPluginValue).getFormats().forEach((formatName, formatValue) -> { + FormatPluginConfig oldPluginConfig = targetPlugin.getFormats().putIfAbsent(formatName, formatValue); + if (oldPluginConfig != null) { + logger.warn("Duplicate format instance '[{}]' defined in [{}, {}], ignoring the later one.", + formatName, pluginURLMap.get(targetStoragePluginName), url); + } + }); + } else { + logger.warn("Formats are only supported by File System plugin type: '[{}]'", targetStoragePluginName); + } + }); + } + + private StoragePlugins getPluginsFromResource(URL resource, LogicalPlanPersistence lpPersistence) throws IOException { + String pluginsData = Resources.toString(resource, Charsets.UTF_8); + return lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class); + } + + /** * Dynamically loads system plugins annotated with {@link SystemPlugin}. * Will skip plugin initialization if no matching constructor, incorrect class implementation, name absence are detected. * diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java index 4b75e33..58f69a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.dfs; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; @@ -48,7 +49,7 @@ public class FileSystemConfig extends StoragePluginConfig { Map<String, WorkspaceConfig> caseInsensitiveWorkspaces = CaseInsensitiveMap.newHashMap(); Optional.ofNullable(workspaces).ifPresent(caseInsensitiveWorkspaces::putAll); this.workspaces = caseInsensitiveWorkspaces; - this.formats = formats; + this.formats = formats != null ? formats : new LinkedHashMap<>(); } @JsonProperty
