DRILL-1540: Provide a fallback data format in case Drill is unable to determine one
+ added "storageformat" to ignored list to avoid failure in existing cluster. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/958de3d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/958de3d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/958de3d6 Branch: refs/heads/master Commit: 958de3d62082ac1a70fd2a29c8f4b5eca3a78f91 Parents: ae2790e Author: Hanifi Gunes <hgu...@maprtech.com> Authored: Wed Oct 15 19:23:30 2014 -0700 Committer: Aditya Kishore <adi...@maprtech.com> Committed: Wed Oct 22 18:20:00 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/store/dfs/FileSystemPlugin.java | 33 +++++++++----------- .../drill/exec/store/dfs/WorkspaceConfig.java | 15 ++++----- .../exec/store/dfs/WorkspaceSchemaFactory.java | 26 ++++++++++----- .../resources/bootstrap-storage-plugins.json | 3 +- .../resources/bootstrap-storage-plugins.json | 3 +- .../largefiles/merging_receiver_large_data.json | 6 ++-- .../resources/writer/simple_csv_writer.json | 6 ++-- 7 files changed, 47 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java index b0855c8..0449db3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.store.dfs; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -53,7 +52,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class); private final FileSystemSchemaFactory schemaFactory; - private Map<String, FormatPlugin> formatsByName; + private Map<String, FormatPlugin> formatPluginsByName; private Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig; private FileSystemConfig config; private DrillbitContext context; @@ -68,30 +67,28 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.connection); fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName()); fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName()); - this.fs = FileSystemCreator.getFileSystem(context.getConfig(), fsConf); - this.formatsByName = FormatCreator.getFormatPlugins(context, fs, config); + fs = FileSystemCreator.getFileSystem(context.getConfig(), fsConf); + formatPluginsByName = FormatCreator.getFormatPlugins(context, fs, config); List<FormatMatcher> matchers = Lists.newArrayList(); formatPluginsByConfig = Maps.newHashMap(); - for (FormatPlugin p : formatsByName.values()) { + for (FormatPlugin p : formatPluginsByName.values()) { matchers.add(p.getMatcher()); formatPluginsByConfig.put(p.getConfig(), p); } - List<WorkspaceSchemaFactory> factories; - if (config.workspaces == null || config.workspaces.isEmpty()) { - factories = Collections.singletonList( - new WorkspaceSchemaFactory(context.getConfig(), context.getPersistentStoreProvider(), this, "default", name, fs, WorkspaceConfig.DEFAULT, matchers)); - } else { - factories = Lists.newArrayList(); + boolean noWorkspace = config.workspaces == null || config.workspaces.isEmpty(); + List<WorkspaceSchemaFactory> factories = Lists.newArrayList(); + if (!noWorkspace) { for (Map.Entry<String, WorkspaceConfig> space : config.workspaces.entrySet()) { factories.add(new WorkspaceSchemaFactory(context.getConfig(), context.getPersistentStoreProvider(), this, space.getKey(), name, fs, space.getValue(), matchers)); } + } - // if the "default" workspace is not given add one. - if (!config.workspaces.containsKey("default")) { - factories.add(new WorkspaceSchemaFactory(context.getConfig(), context.getPersistentStoreProvider(), this, "default", name, fs, WorkspaceConfig.DEFAULT, matchers)); - } + // if the "default" workspace is not given add one. + if (noWorkspace || !config.workspaces.containsKey("default")) { + factories.add(new WorkspaceSchemaFactory(context.getConfig(), context.getPersistentStoreProvider(), this, "default", name, fs, WorkspaceConfig.DEFAULT, matchers)); } + this.schemaFactory = new FileSystemSchemaFactory(name, factories); } catch (IOException e) { throw new ExecutionSetupException("Failure setting up file system plugin.", e); @@ -113,7 +110,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ FormatSelection formatSelection = selection.getWith(context.getConfig(), FormatSelection.class); FormatPlugin plugin; if (formatSelection.getFormat() instanceof NamedFormatPluginConfig) { - plugin = formatsByName.get( ((NamedFormatPluginConfig) formatSelection.getFormat()).name); + plugin = formatPluginsByName.get( ((NamedFormatPluginConfig) formatSelection.getFormat()).name); } else { plugin = formatPluginsByConfig.get(formatSelection.getFormat()); } @@ -129,12 +126,12 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ } public FormatPlugin getFormatPlugin(String name) { - return formatsByName.get(name); + return formatPluginsByName.get(name); } public FormatPlugin getFormatPlugin(FormatPluginConfig config) { if (config instanceof NamedFormatPluginConfig) { - return formatsByName.get(((NamedFormatPluginConfig) config).name); + return formatPluginsByName.get(((NamedFormatPluginConfig) config).name); } else { return formatPluginsByConfig.get(config); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java index 2103a96..a6ee545 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.dfs; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; /** @@ -25,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; * - writable flag to indicate whether the location supports creating new tables. * - default storage format for new tables created in this workspace. */ +@JsonIgnoreProperties(value = {"storageformat"}) public class WorkspaceConfig { /** Default workspace is a root directory which supports read, but not write. */ @@ -32,14 +34,14 @@ public class WorkspaceConfig { private final String location; private final boolean writable; - private final String storageformat; + private final String defaultInputFormat; public WorkspaceConfig(@JsonProperty("location") String location, @JsonProperty("writable") boolean writable, - @JsonProperty("storageformat") String storageformat) { + @JsonProperty("defaultInputFormat") String defaultInputFormat) { this.location = location; this.writable = writable; - this.storageformat = storageformat; + this.defaultInputFormat = defaultInputFormat; } public String getLocation() { @@ -50,9 +52,8 @@ public class WorkspaceConfig { return writable; } - @JsonProperty("storageformat") - public String getStorageFormat() { - return storageformat; + public String getDefaultInputFormat() { + return defaultInputFormat; } @Override @@ -68,6 +69,6 @@ public class WorkspaceConfig { WorkspaceConfig that = (WorkspaceConfig) obj; return ((this.location == null && that.location == null) || this.location.equals(that.location)) && this.writable == that.writable && - ((this.storageformat == null && that.storageformat == null) || this.storageformat.equals(that.storageformat)); + ((this.defaultInputFormat == null && that.defaultInputFormat == null) || this.defaultInputFormat.equals(that.defaultInputFormat)); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java index 68f380c..acd8fcb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java @@ -21,10 +21,14 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import net.hydromatic.optiq.Table; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.dotdrill.DotDrillFile; @@ -63,7 +67,6 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa private final String storageEngineName; private final String schemaName; private final FileSystemPlugin plugin; -// private final PTable<String> knownPaths; private final PStore<String> knownViews; private final ObjectMapper mapper; @@ -84,17 +87,11 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa // setup cache if (storageEngineName == null) { this.knownViews = null; -// this.knownPaths = null; } else { this.knownViews = provider.getPStore(PStoreConfig // .newJacksonBuilder(drillConfig.getMapper(), String.class) // .name(Joiner.on('.').join("storage.views", storageEngineName, schemaName)) // .build()); - -// this.knownPaths = provider.getPTable(PTableConfig // -// .newJacksonBuilder(drillConfig.getMapper(), String.class) // -// .name(Joiner.on('.').join("storage.cache", storageEngineName, schemaName)) // -// .build()); } @@ -105,6 +102,19 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa fileMatchers.add(m); } + // NOTE: Add fallback format matcher if given in the configuration. Make sure fileMatchers is an order-preserving list. + final String defaultInputFormat = config.getDefaultInputFormat(); + if (!Strings.isNullOrEmpty(defaultInputFormat)) { + final FormatPlugin formatPlugin = plugin.getFormatPlugin(defaultInputFormat); + if (formatPlugin == null) { + final String message = String.format("Unable to find default input format[%s] for workspace[%s.%s]", + defaultInputFormat, storageEngineName, schemaName); + throw new ExecutionSetupException(message); + } + final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin, fs, + ImmutableList.of(Pattern.compile(".*")), ImmutableList.<MagicString>of()); + fileMatchers.add(fallbackMatcher); + } } private Path getViewPath(String name) { @@ -283,7 +293,7 @@ public class WorkspaceSchemaFactory implements ExpandingConcurrentMap.MapValueFa FormatPlugin formatPlugin = plugin.getFormatPlugin(storage); if (formatPlugin == null) { throw new UnsupportedOperationException( - String.format("Unsupported format '%s' in workspace '%s'", config.getStorageFormat(), + String.format("Unsupported format '%s' in workspace '%s'", config.getDefaultInputFormat(), Joiner.on(".").join(getSchemaPath()))); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json index 31df303..4a20bea 100644 --- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json +++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json @@ -10,8 +10,7 @@ }, "tmp" : { location: "/tmp", - writable: true, - storageformat: "csv" + writable: true } }, formats: { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json index 35ee717..e9772cf 100644 --- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json +++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json @@ -10,8 +10,7 @@ }, "tmp" : { location: "/tmp/drilltest", - writable: true, - storageformat: "csv" + writable: true } }, formats: { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json b/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json index 902a59e..265486d 100644 --- a/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json +++ b/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json @@ -16,13 +16,11 @@ "workspaces" : { "root" : { "location" : "/", - "writable" : false, - "storageformat" : null + "writable" : false }, "tmp" : { "location" : "/tmp", - "writable" : true, - "storageformat" : "csv" + "writable" : true } }, "formats" : { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/test/resources/writer/simple_csv_writer.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json index ff670d5..7980eb5 100644 --- a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json +++ b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json @@ -34,13 +34,11 @@ "workspaces" : { "root" : { "location" : "/", - "writable" : false, - "storageformat" : null + "writable" : false }, "tmp" : { "location" : "/tmp", - "writable" : true, - "storageformat" : "csv" + "writable" : true } }, "formats" : {