DRILL-1540: Provide a fallback data format in case Drill is unable to determine 
one

+ added "storageformat" to ignored list to avoid failure in existing cluster.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/958de3d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/958de3d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/958de3d6

Branch: refs/heads/master
Commit: 958de3d62082ac1a70fd2a29c8f4b5eca3a78f91
Parents: ae2790e
Author: Hanifi Gunes <hgu...@maprtech.com>
Authored: Wed Oct 15 19:23:30 2014 -0700
Committer: Aditya Kishore <adi...@maprtech.com>
Committed: Wed Oct 22 18:20:00 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/store/dfs/FileSystemPlugin.java  | 33 +++++++++-----------
 .../drill/exec/store/dfs/WorkspaceConfig.java   | 15 ++++-----
 .../exec/store/dfs/WorkspaceSchemaFactory.java  | 26 ++++++++++-----
 .../resources/bootstrap-storage-plugins.json    |  3 +-
 .../resources/bootstrap-storage-plugins.json    |  3 +-
 .../largefiles/merging_receiver_large_data.json |  6 ++--
 .../resources/writer/simple_csv_writer.json     |  6 ++--
 7 files changed, 47 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index b0855c8..0449db3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.store.dfs;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -53,7 +52,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FileSystemPlugin.class);
 
   private final FileSystemSchemaFactory schemaFactory;
-  private Map<String, FormatPlugin> formatsByName;
+  private Map<String, FormatPlugin> formatPluginsByName;
   private Map<FormatPluginConfig, FormatPlugin> formatPluginsByConfig;
   private FileSystemConfig config;
   private DrillbitContext context;
@@ -68,30 +67,28 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
       fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.connection);
       fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
       fsConf.set("fs.drill-local.impl", 
LocalSyncableFileSystem.class.getName());
-      this.fs = FileSystemCreator.getFileSystem(context.getConfig(), fsConf);
-      this.formatsByName = FormatCreator.getFormatPlugins(context, fs, config);
+      fs = FileSystemCreator.getFileSystem(context.getConfig(), fsConf);
+      formatPluginsByName = FormatCreator.getFormatPlugins(context, fs, 
config);
       List<FormatMatcher> matchers = Lists.newArrayList();
       formatPluginsByConfig = Maps.newHashMap();
-      for (FormatPlugin p : formatsByName.values()) {
+      for (FormatPlugin p : formatPluginsByName.values()) {
         matchers.add(p.getMatcher());
         formatPluginsByConfig.put(p.getConfig(), p);
       }
 
-      List<WorkspaceSchemaFactory> factories;
-      if (config.workspaces == null || config.workspaces.isEmpty()) {
-        factories = Collections.singletonList(
-            new WorkspaceSchemaFactory(context.getConfig(), 
context.getPersistentStoreProvider(), this, "default", name, fs, 
WorkspaceConfig.DEFAULT, matchers));
-      } else {
-        factories = Lists.newArrayList();
+      boolean noWorkspace = config.workspaces == null || 
config.workspaces.isEmpty();
+      List<WorkspaceSchemaFactory> factories = Lists.newArrayList();
+      if (!noWorkspace) {
         for (Map.Entry<String, WorkspaceConfig> space : 
config.workspaces.entrySet()) {
           factories.add(new WorkspaceSchemaFactory(context.getConfig(), 
context.getPersistentStoreProvider(), this, space.getKey(), name, fs, 
space.getValue(), matchers));
         }
+      }
 
-        // if the "default" workspace is not given add one.
-        if (!config.workspaces.containsKey("default")) {
-          factories.add(new WorkspaceSchemaFactory(context.getConfig(), 
context.getPersistentStoreProvider(), this, "default", name, fs, 
WorkspaceConfig.DEFAULT, matchers));
-        }
+      // if the "default" workspace is not given add one.
+      if (noWorkspace || !config.workspaces.containsKey("default")) {
+        factories.add(new WorkspaceSchemaFactory(context.getConfig(), 
context.getPersistentStoreProvider(), this, "default", name, fs, 
WorkspaceConfig.DEFAULT, matchers));
       }
+
       this.schemaFactory = new FileSystemSchemaFactory(name, factories);
     } catch (IOException e) {
       throw new ExecutionSetupException("Failure setting up file system 
plugin.", e);
@@ -113,7 +110,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
     FormatSelection formatSelection = selection.getWith(context.getConfig(), 
FormatSelection.class);
     FormatPlugin plugin;
     if (formatSelection.getFormat() instanceof NamedFormatPluginConfig) {
-      plugin = formatsByName.get( ((NamedFormatPluginConfig) 
formatSelection.getFormat()).name);
+      plugin = formatPluginsByName.get( ((NamedFormatPluginConfig) 
formatSelection.getFormat()).name);
     } else {
       plugin = formatPluginsByConfig.get(formatSelection.getFormat());
     }
@@ -129,12 +126,12 @@ public class FileSystemPlugin extends 
AbstractStoragePlugin{
   }
 
   public FormatPlugin getFormatPlugin(String name) {
-    return formatsByName.get(name);
+    return formatPluginsByName.get(name);
   }
 
   public FormatPlugin getFormatPlugin(FormatPluginConfig config) {
     if (config instanceof NamedFormatPluginConfig) {
-      return formatsByName.get(((NamedFormatPluginConfig) config).name);
+      return formatPluginsByName.get(((NamedFormatPluginConfig) config).name);
     } else {
       return formatPluginsByConfig.get(config);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
index 2103a96..a6ee545 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceConfig.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.dfs;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
 /**
@@ -25,6 +26,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
  *  - writable flag to indicate whether the location supports creating new 
tables.
  *  - default storage format for new tables created in this workspace.
  */
+@JsonIgnoreProperties(value = {"storageformat"})
 public class WorkspaceConfig {
 
   /** Default workspace is a root directory which supports read, but not 
write. */
@@ -32,14 +34,14 @@ public class WorkspaceConfig {
 
   private final String location;
   private final boolean writable;
-  private final String storageformat;
+  private final String defaultInputFormat;
 
   public WorkspaceConfig(@JsonProperty("location") String location,
                          @JsonProperty("writable") boolean writable,
-                         @JsonProperty("storageformat") String storageformat) {
+                         @JsonProperty("defaultInputFormat") String 
defaultInputFormat) {
     this.location = location;
     this.writable = writable;
-    this.storageformat = storageformat;
+    this.defaultInputFormat = defaultInputFormat;
   }
 
   public String getLocation() {
@@ -50,9 +52,8 @@ public class WorkspaceConfig {
     return writable;
   }
 
-  @JsonProperty("storageformat")
-  public String getStorageFormat() {
-    return storageformat;
+  public String getDefaultInputFormat() {
+    return defaultInputFormat;
   }
 
   @Override
@@ -68,6 +69,6 @@ public class WorkspaceConfig {
     WorkspaceConfig that = (WorkspaceConfig) obj;
     return ((this.location == null && that.location == null) || 
this.location.equals(that.location)) &&
         this.writable == that.writable &&
-        ((this.storageformat == null && that.storageformat == null) || 
this.storageformat.equals(that.storageformat));
+        ((this.defaultInputFormat == null && that.defaultInputFormat == null) 
|| this.defaultInputFormat.equals(that.defaultInputFormat));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
index 68f380c..acd8fcb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
@@ -21,10 +21,14 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import net.hydromatic.optiq.Table;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.dotdrill.DotDrillFile;
@@ -63,7 +67,6 @@ public class WorkspaceSchemaFactory implements 
ExpandingConcurrentMap.MapValueFa
   private final String storageEngineName;
   private final String schemaName;
   private final FileSystemPlugin plugin;
-//  private final PTable<String> knownPaths;
 
   private final PStore<String> knownViews;
   private final ObjectMapper mapper;
@@ -84,17 +87,11 @@ public class WorkspaceSchemaFactory implements 
ExpandingConcurrentMap.MapValueFa
     // setup cache
     if (storageEngineName == null) {
       this.knownViews = null;
-//      this.knownPaths = null;
     } else {
       this.knownViews = provider.getPStore(PStoreConfig //
           .newJacksonBuilder(drillConfig.getMapper(), String.class) //
           .name(Joiner.on('.').join("storage.views", storageEngineName, 
schemaName)) //
           .build());
-
-//      this.knownPaths = provider.getPTable(PTableConfig //
-//          .newJacksonBuilder(drillConfig.getMapper(), String.class) //
-//          .name(Joiner.on('.').join("storage.cache", storageEngineName, 
schemaName)) //
-//          .build());
     }
 
 
@@ -105,6 +102,19 @@ public class WorkspaceSchemaFactory implements 
ExpandingConcurrentMap.MapValueFa
       fileMatchers.add(m);
     }
 
+    // NOTE: Add fallback format matcher if given in the configuration. Make 
sure fileMatchers is an order-preserving list.
+    final String defaultInputFormat = config.getDefaultInputFormat();
+    if (!Strings.isNullOrEmpty(defaultInputFormat)) {
+      final FormatPlugin formatPlugin = 
plugin.getFormatPlugin(defaultInputFormat);
+      if (formatPlugin == null) {
+        final String message = String.format("Unable to find default input 
format[%s] for workspace[%s.%s]",
+            defaultInputFormat, storageEngineName, schemaName);
+        throw new ExecutionSetupException(message);
+      }
+      final FormatMatcher fallbackMatcher = new 
BasicFormatMatcher(formatPlugin, fs,
+          ImmutableList.of(Pattern.compile(".*")), 
ImmutableList.<MagicString>of());
+      fileMatchers.add(fallbackMatcher);
+    }
   }
 
   private Path getViewPath(String name) {
@@ -283,7 +293,7 @@ public class WorkspaceSchemaFactory implements 
ExpandingConcurrentMap.MapValueFa
       FormatPlugin formatPlugin = plugin.getFormatPlugin(storage);
       if (formatPlugin == null) {
         throw new UnsupportedOperationException(
-          String.format("Unsupported format '%s' in workspace '%s'", 
config.getStorageFormat(),
+          String.format("Unsupported format '%s' in workspace '%s'", 
config.getDefaultInputFormat(),
               Joiner.on(".").join(getSchemaPath())));
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json 
b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index 31df303..4a20bea 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -10,8 +10,7 @@
         },
         "tmp" : {
           location: "/tmp",
-          writable: true,
-          storageformat: "csv"
+          writable: true
         }
       },
       formats: {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json 
b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
index 35ee717..e9772cf 100644
--- a/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/test/resources/bootstrap-storage-plugins.json
@@ -10,8 +10,7 @@
         },
         "tmp" : {
           location: "/tmp/drilltest",
-          writable: true,
-          storageformat: "csv"
+          writable: true
         }
       },
       formats: {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json 
b/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json
index 902a59e..265486d 100644
--- 
a/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json
+++ 
b/exec/java-exec/src/test/resources/largefiles/merging_receiver_large_data.json
@@ -16,13 +16,11 @@
       "workspaces" : {
         "root" : {
           "location" : "/",
-          "writable" : false,
-          "storageformat" : null
+          "writable" : false
         },
         "tmp" : {
           "location" : "/tmp",
-          "writable" : true,
-          "storageformat" : "csv"
+          "writable" : true
         }
       },
       "formats" : {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/958de3d6/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json 
b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
index ff670d5..7980eb5 100644
--- a/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
+++ b/exec/java-exec/src/test/resources/writer/simple_csv_writer.json
@@ -34,13 +34,11 @@
         "workspaces" : {
           "root" : {
             "location" : "/",
-            "writable" : false,
-            "storageformat" : null
+            "writable" : false
           },
           "tmp" : {
             "location" : "/tmp",
-            "writable" : true,
-            "storageformat" : "csv"
+            "writable" : true
           }
         },
         "formats" : {

Reply via email to