cgivre commented on a change in pull request #2084:
URL: https://github.com/apache/drill/pull/2084#discussion_r432975423



##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSJSONRecordReader.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.easy.json.JsonProcessor;
+import org.apache.drill.exec.store.easy.json.reader.CountingJsonReader;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class IPFSJSONRecordReader extends AbstractRecordReader {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSJSONRecordReader.class);
+
+  public static final long DEFAULT_ROWS_PER_BATCH = 
BaseValueVector.INITIAL_VALUE_ALLOCATION;
+
+  private FragmentContext fragmentContext;
+  private IPFSContext ipfsContext;
+  private String subScanSpec;
+  private List<SchemaPath> columnList;
+  private JsonProcessor jsonReader;
+  private InputStream stream;
+  private int recordCount;
+  private long runningRecordCount = 0;
+  private final boolean enableAllTextMode;
+  private final boolean enableNanInf;
+  private final boolean enableEscapeAnyChar;
+  private final boolean readNumbersAsDouble;
+  private final boolean unionEnabled;
+  private long parseErrorCount;
+  private final boolean skipMalformedJSONRecords;
+  private final boolean printSkippedMalformedJSONRecordLineNumber;
+  private JsonProcessor.ReadState write = null;
+  private VectorContainerWriter writer;
+
+  public IPFSJSONRecordReader(FragmentContext fragmentContext, IPFSContext 
ipfsContext, String scanSpec, List<SchemaPath> columns) {
+    this.fragmentContext = fragmentContext;
+    this.ipfsContext = ipfsContext;
+    this.subScanSpec = scanSpec;
+    this.columnList = columns;
+    setColumns(columns);
+    this.fragmentContext = fragmentContext;
+    // only enable all text mode if we aren't using embedded content mode.
+    this.enableAllTextMode = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
+    this.enableEscapeAnyChar = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR);
+    this.enableNanInf = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR);
+    this.readNumbersAsDouble = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR);
+    this.unionEnabled = 
fragmentContext.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
+    this.skipMalformedJSONRecords = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR);
+    this.printSkippedMalformedJSONRecordLineNumber = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR);
+
+  }
+
+  @Override
+  public String toString() {
+    return super.toString()
+        + ", recordCount = " + recordCount
+        + ", parseErrorCount = " + parseErrorCount
+        + ", runningRecordCount = " + runningRecordCount + ", ...]";
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {

Review comment:
       @dbw9580 
   Thanks for submitting this.  As a first step, take a look at the HTTP 
storage plugin.  Your implementation uses the older JSON reader, and the HTTP 
plugin demonstrates how to use the newer version as well as the EVF framework.  
You'll see the code is much simpler.  
   
   
https://github.com/apache/drill/blob/d16e3144c4b51dccd322639711ffc8706f4c2e13/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java#L52-L95.

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSContext.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+public class IPFSContext {
+  private IPFS ipfsClient;
+  private IPFSHelper ipfsHelper;
+  private IPFSPeer myself;
+  private IPFSStoragePluginConfig storagePluginConfig;
+  private IPFSStoragePlugin storagePlugin;
+  private LoadingCache<Multihash, IPFSPeer> ipfsPeerCache =
+      CacheBuilder.newBuilder()
+                  .maximumSize(1000)
+                  .refreshAfterWrite(10, TimeUnit.MINUTES)
+                  .build(new CacheLoader<Multihash, IPFSPeer>() {
+                    @Override
+                    public IPFSPeer load(Multihash key) {
+                      return new IPFSPeer(getIPFSHelper(), key);
+                    }
+                  });
+
+  public IPFSContext(IPFSStoragePluginConfig config, IPFSStoragePlugin plugin, 
IPFS client) throws IOException {
+    this.ipfsClient = client;
+    this.ipfsHelper = new IPFSHelper(client);
+    this.storagePlugin = plugin;
+    this.storagePluginConfig = config;
+
+    Map res = ipfsHelper.timedFailure(client::id, 
config.getIpfsTimeout(FIND_PEER_INFO));
+    Multihash myID = Multihash.fromBase58((String)res.get("ID"));
+    List<MultiAddress> myAddrs = ((List<String>) res.get("Addresses"))
+        .stream()
+        .map(addr -> new MultiAddress(addr))
+        .collect(Collectors.toList());
+    this.myself = new IPFSPeer(this.ipfsHelper, myID, myAddrs);
+    this.ipfsHelper.setMyself(myself);
+  }
+
+
+  public IPFS getIPFSClient() {
+    return ipfsClient;
+  }
+
+  public IPFSHelper getIPFSHelper() {
+    return ipfsHelper;
+  }
+
+  public IPFSPeer getMyself() {
+    return myself;
+  }
+
+  public IPFSStoragePlugin getStoragePlugin() {
+    return storagePlugin;
+  }
+
+  public IPFSStoragePluginConfig getStoragePluginConfig() {
+    return storagePluginConfig;
+  }
+
+  public LoadingCache<Multihash, IPFSPeer> getIPFSPeerCache() {
+    return ipfsPeerCache;
+  }
+

Review comment:
       Nit: remove space.

##########
File path: contrib/storage-ipfs/pom.xml
##########
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>drill-contrib-parent</artifactId>
+        <groupId>org.apache.drill.contrib</groupId>
+        <version>1.18.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>drill-ipfs-storage</artifactId>
+    <name>contrib/ipfs-storage-plugin</name>
+    <version>0.1.0</version>
+    <properties>
+        <ipfs.TestSuite>**/IPFSTestSuit.class</ipfs.TestSuite>
+    </properties>
+
+    <repositories>

Review comment:
       Is this necessary?

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSContext.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+public class IPFSContext {
+  private IPFS ipfsClient;
+  private IPFSHelper ipfsHelper;

Review comment:
       Please make as many instance variables `final` as possible. 

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase{
+    static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSStoragePluginConfig.class);
+
+    public static final String NAME = "ipfs";
+
+    private final String host;
+    private final int port;
+
+    @JsonProperty("max-nodes-per-leaf")

Review comment:
       JsonProperty not needed here.

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePlugin.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.ipfs.api.IPFS;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+
+import java.io.IOException;
+import java.util.List;
+
+public class IPFSStoragePlugin extends AbstractStoragePlugin {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSStoragePlugin.class);
+
+  private final IPFSContext ipfsContext;
+  private final IPFSStoragePluginConfig pluginConfig;
+  private final IPFSSchemaFactory schemaFactory;
+  private final IPFS ipfsClient;
+
+  public IPFSStoragePlugin(IPFSStoragePluginConfig config, DrillbitContext 
context, String name) throws IOException {
+    super(context, name);
+    this.ipfsClient = new IPFS(config.getHost(), config.getPort());
+    this.ipfsContext = new IPFSContext(config, this, ipfsClient);
+    this.schemaFactory = new IPFSSchemaFactory(this.ipfsContext, name);
+    this.pluginConfig = config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return true;
+  }
+
+  @Override
+  public IPFSGroupScan getPhysicalScan(String userName, JSONOptions selection) 
throws IOException {
+    logger.debug("IPFSStoragePlugin before getPhysicalScan");
+    IPFSScanSpec spec = selection.getListWith(new ObjectMapper(), new 
TypeReference<IPFSScanSpec>() {});
+    logger.debug("IPFSStoragePlugin getPhysicalScan with selection {}", 
selection);
+    return new IPFSGroupScan(ipfsContext, spec, null);
+  }
+
+  @Override
+  public IPFSGroupScan getPhysicalScan(String userName, JSONOptions selection, 
List<SchemaPath> columns) throws IOException {
+    logger.debug("IPFSStoragePlugin before getPhysicalScan");
+    IPFSScanSpec spec = selection.getListWith(new ObjectMapper(), new 
TypeReference<IPFSScanSpec>() {});
+    logger.debug("IPFSStoragePlugin getPhysicalScan with selection {}, columns 
{}", selection, columns);
+    return new IPFSGroupScan(ipfsContext, spec, columns);
+  }
+
+  public IPFS getIPFSClient() {
+    return ipfsClient;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) 
throws IOException {
+    schemaFactory.registerSchemas(schemaConfig, parent);
+  }
+
+  @Override
+  public IPFSStoragePluginConfig getConfig() {
+    return pluginConfig;
+  }
+
+  public IPFSContext getIPFSContext() {
+    return ipfsContext;
+  }
+

Review comment:
       Nit: remove space

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import java.io.IOException;
+import java.security.InvalidParameterException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+@JsonTypeName("IPFSScanSpec")
+public class IPFSScanSpec {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSScanSpec.class);
+
+  public enum Prefix {
+    @JsonProperty("ipfs")
+    IPFS("ipfs"),
+    @JsonProperty("ipns")
+    IPNS("ipns");
+
+    @JsonProperty("prefix")
+    private String name;
+    Prefix(String prefix) {
+      this.name = prefix;
+    }
+
+    @Override
+    public String toString() {

Review comment:
       For any serialized objects, please use the `PlanStringBuilder()` utility 
function. 
   
   
https://github.com/apache/drill/blob/d16e3144c4b51dccd322639711ffc8706f4c2e13/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpScanSpec.java#L75-L83

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSScanSpec.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import java.io.IOException;
+import java.security.InvalidParameterException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+@JsonTypeName("IPFSScanSpec")
+public class IPFSScanSpec {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSScanSpec.class);

Review comment:
       Please make all `loggers` private static final and remove the full 
module name.   Here and elsewhere.

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSJSONRecordReader.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.easy.json.JsonProcessor;
+import org.apache.drill.exec.store.easy.json.reader.CountingJsonReader;
+import org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class IPFSJSONRecordReader extends AbstractRecordReader {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSJSONRecordReader.class);
+
+  public static final long DEFAULT_ROWS_PER_BATCH = 
BaseValueVector.INITIAL_VALUE_ALLOCATION;
+
+  private FragmentContext fragmentContext;
+  private IPFSContext ipfsContext;
+  private String subScanSpec;
+  private List<SchemaPath> columnList;
+  private JsonProcessor jsonReader;
+  private InputStream stream;
+  private int recordCount;
+  private long runningRecordCount = 0;
+  private final boolean enableAllTextMode;
+  private final boolean enableNanInf;
+  private final boolean enableEscapeAnyChar;
+  private final boolean readNumbersAsDouble;
+  private final boolean unionEnabled;
+  private long parseErrorCount;
+  private final boolean skipMalformedJSONRecords;
+  private final boolean printSkippedMalformedJSONRecordLineNumber;
+  private JsonProcessor.ReadState write = null;
+  private VectorContainerWriter writer;
+
+  public IPFSJSONRecordReader(FragmentContext fragmentContext, IPFSContext 
ipfsContext, String scanSpec, List<SchemaPath> columns) {
+    this.fragmentContext = fragmentContext;
+    this.ipfsContext = ipfsContext;
+    this.subScanSpec = scanSpec;
+    this.columnList = columns;
+    setColumns(columns);
+    this.fragmentContext = fragmentContext;
+    // only enable all text mode if we aren't using embedded content mode.
+    this.enableAllTextMode = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR);
+    this.enableEscapeAnyChar = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ESCAPE_ANY_CHAR_VALIDATOR);
+    this.enableNanInf = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR);
+    this.readNumbersAsDouble = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR);
+    this.unionEnabled = 
fragmentContext.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
+    this.skipMalformedJSONRecords = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_SKIP_MALFORMED_RECORDS_VALIDATOR);
+    this.printSkippedMalformedJSONRecordLineNumber = 
fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_PRINT_INVALID_RECORDS_LINE_NOS_FLAG_VALIDATOR);
+
+  }
+
+  @Override
+  public String toString() {
+    return super.toString()
+        + ", recordCount = " + recordCount
+        + ", parseErrorCount = " + parseErrorCount
+        + ", runningRecordCount = " + runningRecordCount + ", ...]";
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {

Review comment:
       You'll also have to modify the `ScanBatchCreator` as well. 

##########
File path: contrib/storage-ipfs/README.md
##########
@@ -0,0 +1,182 @@
+# Drill Storage Plugin for IPFS
+
+[中文](README.zh.md)
+
+## Contents
+
+0. [Introduction](#Introduction)
+1. [Compile](#Compile)
+2. [Install](#Install)
+2. [Configuration](#Configuration)
+3. [Run](#Run)
+
+## Introduction
+
+Minerva is a storage plugin of Drill that connects IPFS's decentralized 
storage and Drill's flexible query engine. Any data file stored on IPFS can be 
easily accessed from Drill's query interface, just like a file stored on a 
local disk. Moreover, with Drill's capability of distributed execution, other 
instances who are also running Minerva can help accelerate the execution: the 
data stays where it was, and the queries go to the most suitable nodes which 
stores the data locally and from there the operations can be performed most 
efficiently. 
+
+Slides that explain our ideas and the technical details of Minerva: 
<https://www.slideshare.net/BowenDing4/minerva-ipfs-storage-plugin-for-ipfs>
+
+A live demo: <http://www.datahub.pub/> hosted on a private cluster of Minerva.
+
+Note that it's still in early stages of development and the overall stability 
and performance is not satisfactory. PRs are very much welcome!
+
+## Compile
+
+### Dependencies
+
+This project depends on forks of the following projects:
+
+* IPFS Java API: [java-ipfs-api](https://github.com/bdchain/java-ipfs-api)
+
+* Drill 1.16.0:[Drill-fork](https://github.com/bdchain/Drill-fork) 
(`1.16.0-fork` branch)
+
+Please clone and build these projects locally, or the compiler will complain 
about unknown symbols when you compile this project.
+
+### Compile under the Drill source tree
+
+Clone to the `contrib` directory in Drill source tree, e.g. 
`contrib/storage-ipfs`:
+```
+cd drill/contrib/
+git clone https://github.com/bdchain/Minerva.git storage-ipfs
+```
+
+Edit the parent POM of Drill contrib module (contrib/pom.xml), add this plugin 
under `<modules>` section:
+
+```
+<modules>
+    <module>storage-hbase</module>
+    <module>format-maprdb</module>
+    .....
+    <module>storage-ipfs</module>
+</modules>
+```
+
+Build from the root directory of Drill source tree:
+
+```
+mvn -T 2C clean install -DskipTests -Dcheckstyle.skip=true
+```
+
+The jars are in the `storage-ipfs/target` directory.
+
+## Install
+
+The executables and configurations are in 
`distribution/target/apache-drill-1.16.0`. Copy the entire directory to 
somewhere outside the source tree, and name it `drill-run` e.g., for testing 
later.
+
+Copy the `drill-ipfs-storage-{version}.jar` generated jar file to 
`drill-run/jars`.
+
+Copy `java-api-ipfs-v1.2.2.jar` which is IPFS's Java API, along with its 
dependencies provided as jar files:
+
+```
+cid.jar
+junit-4.12.jar
+multiaddr.jar
+multibase.jar
+multihash.jar
+hamcrest-core-1.3.jar
+```
+
+to `drill-run/jars/3rdparty`.
+
+Optionally, copy the configuration override file 
`storage-plugin-override.conf` to `drill-run/conf`, if you want Drill to auto 
configure and enable IPFS storage plugin at every (re)start.
+
+## Configuration
+
+1. Set Drill hostname to the IP address of the node to run Drill:
+    
+    Edit file `conf/drill-env.sh` and change the environment variable 
`DRILL_HOST_NAME` to the IP address of the node. Use private or global 
addresses, depending on whether you plan to run it on a cluster or the open 
Internet.
+
+2. Configure the IPFS storage plugin:
+    
+    If you are not using the configuration override file, you will have to 
manually configure and enable the plugin.
+    
+    Run Drill according to [Section Run](#Run) and go to the webui of Drill 
(can be found at <http://localhost:8047>). Under the Storage tab, create a new 
storage plugin named `ipfs` and click the Create button.
+    
+    Copy and paste the default configuration of the IPFS storage plugin 
located at `storage-ipfs/src/resources/bootstrap-storage-plugins.json`:
+    
+    ```
+    ipfs : {
+        "type":"ipfs",
+        "host": "127.0.0.1",
+        "port": 5001,
+        "max-nodes-per-leaf": 3,
+        "ipfs-timeouts": {
+          "find-provider": 4,
+          "find-peer-info": 4,
+          "fetch-data": 5
+        },
+        "groupscan-worker-threads": 50,
+        "formats": null,
+        "enabled": true
+    }
+    ```
+    
+    where 
+    
+    `host` and `port` are the host and API port where your IPFS daemon will be 
listening. Change it so that it matches the configuration of your IPFS instance.
+
+    `max-nodes-per-leaf` controls how many provider nodes will be considered 
when the query is being planned. A larger value increases the parallelization 
width but typically takes longer to find enough providers from DHT resolution. 
A smaller value does the opposite.
+    
+    `ipfs-timeouts` set the maximum amount of time in seconds for various time 
consuming operations: `find-provider` is the time allowed to do DHT queries to 
find providers, `find-peer-info` is the time allowed to resolve the network 
addresses of the providers and `fetch-data` is the time the actual transmission 
is allowed to take. 
+    
+    `groupscan-worker-threads` limits the number of worker threads when the 
planner communicate with IPFS daemon to resolve providers and peer info.
+    
+    `formats` specifies the formats of the files. It is unimplemented for now 
and does nothing.
+    
+    Click the Update button after finishing editing. You should see the IPFS 
storage plugin is registered with Drill and you can enable it with the Enable 
button.
+    
+3. Configure IPFS
+
+    Start the IPFS daemon first. 
+    
+    Set a Drill-ready flag to the node:
+    
+    ```
+    ipfs name publish $(\
+      ipfs object patch add-link $(ipfs object new) "drill-ready" $(\
+        printf "1" | ipfs object patch set-data $(ipfs object new)\
+      )\
+    )
+    ```
+    
+    This flag indicates that an IPFS node is also capable of handling Drill 
quries and the planner will consider it when scheduling a query to execute 
distributedly. A node without this flag will be ignored.
+    
+
+## Run
+
+### Embedded mode
+
+Start IPFS daemon:
+
+```
+ipfs daemon &>/dev/null &
+```
+
+start drill-embedded:
+
+```
+drill-run/bin/drill-embedded
+```
+
+You can now execute queries via the command line as well as the web interface.
+
+### As a background service
+
+You can run drill-embedded as a background process without controlling a 
terminal. This is done with the help of tmux, which is available in many 
distributions of Linux.
+
+Edit the systemd service file `drill-embedded.service`, so that the 
environment variable `DRILL_HOME` pointes to where Drill is installed:
+```
+Environment="DRILL_HOME=/home/drill/apache-drill-1.16.0"
+```
+Copy the service file to systemd's configuration directory, e.g. 
`/usr/lib/systemd/system`:
+```
+cp drill-embedded.service /usr/lib/systemd/system
+```
+Reload the systemd daemon:
+```
+systemd daemon-reload
+```
+Start the service:
+```
+systemd start drill-embedded.service
+```

Review comment:
       Please create a separate JIRA to add documentation to the Drill website. 
 That does not have to be part of this PR, but otherwise nobody will know about 
this. ;-).

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSContext.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import io.ipfs.api.IPFS;
+import io.ipfs.multiaddr.MultiAddress;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheBuilder;
+import org.apache.drill.shaded.guava.com.google.common.cache.CacheLoader;
+import org.apache.drill.shaded.guava.com.google.common.cache.LoadingCache;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.drill.exec.store.ipfs.IPFSStoragePluginConfig.IPFSTimeOut.FIND_PEER_INFO;
+
+public class IPFSContext {
+  private IPFS ipfsClient;
+  private IPFSHelper ipfsHelper;
+  private IPFSPeer myself;
+  private IPFSStoragePluginConfig storagePluginConfig;
+  private IPFSStoragePlugin storagePlugin;
+  private LoadingCache<Multihash, IPFSPeer> ipfsPeerCache =
+      CacheBuilder.newBuilder()
+                  .maximumSize(1000)
+                  .refreshAfterWrite(10, TimeUnit.MINUTES)
+                  .build(new CacheLoader<Multihash, IPFSPeer>() {
+                    @Override
+                    public IPFSPeer load(Multihash key) {
+                      return new IPFSPeer(getIPFSHelper(), key);
+                    }
+                  });
+
+  public IPFSContext(IPFSStoragePluginConfig config, IPFSStoragePlugin plugin, 
IPFS client) throws IOException {
+    this.ipfsClient = client;
+    this.ipfsHelper = new IPFSHelper(client);
+    this.storagePlugin = plugin;
+    this.storagePluginConfig = config;
+
+    Map res = ipfsHelper.timedFailure(client::id, 
config.getIpfsTimeout(FIND_PEER_INFO));
+    Multihash myID = Multihash.fromBase58((String)res.get("ID"));
+    List<MultiAddress> myAddrs = ((List<String>) res.get("Addresses"))
+        .stream()
+        .map(addr -> new MultiAddress(addr))
+        .collect(Collectors.toList());
+    this.myself = new IPFSPeer(this.ipfsHelper, myID, myAddrs);
+    this.ipfsHelper.setMyself(myself);
+  }
+

Review comment:
       Nit: extra space.

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePlugin.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.ipfs.api.IPFS;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+
+import java.io.IOException;
+import java.util.List;
+
+public class IPFSStoragePlugin extends AbstractStoragePlugin {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSStoragePlugin.class);
+
+  private final IPFSContext ipfsContext;
+  private final IPFSStoragePluginConfig pluginConfig;
+  private final IPFSSchemaFactory schemaFactory;
+  private final IPFS ipfsClient;
+
+  public IPFSStoragePlugin(IPFSStoragePluginConfig config, DrillbitContext 
context, String name) throws IOException {
+    super(context, name);
+    this.ipfsClient = new IPFS(config.getHost(), config.getPort());
+    this.ipfsContext = new IPFSContext(config, this, ipfsClient);
+    this.schemaFactory = new IPFSSchemaFactory(this.ipfsContext, name);
+    this.pluginConfig = config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public boolean supportsWrite() {
+    return true;
+  }
+
+  @Override
+  public IPFSGroupScan getPhysicalScan(String userName, JSONOptions selection) 
throws IOException {

Review comment:
       I'm not sure that is really necessary to have duplicates of this method. 

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase{
+    static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSStoragePluginConfig.class);
+
+    public static final String NAME = "ipfs";
+
+    private final String host;
+    private final int port;
+
+    @JsonProperty("max-nodes-per-leaf")
+    private final int maxNodesPerLeaf;
+
+    //TODO add more specific timeout configs fot different operations in IPFS,
+    // eg. provider resolution, data read, etc.
+    @JsonProperty("ipfs-timeouts")
+    private final Map<IPFSTimeOut, Integer> ipfsTimeouts;
+
+    @JsonIgnore
+    private static final Map<IPFSTimeOut, Integer> ipfsTimeoutDefaults = 
ImmutableMap.of(
+        IPFSTimeOut.FIND_PROV, 4,
+        IPFSTimeOut.FIND_PEER_INFO, 4,
+        IPFSTimeOut.FETCH_DATA, 6
+    );
+
+    public enum IPFSTimeOut {
+        @JsonProperty("find-provider")
+        FIND_PROV("find-provider"),
+        @JsonProperty("find-peer-info")
+        FIND_PEER_INFO("find-peer-info"),
+        @JsonProperty("fetch-data")
+        FETCH_DATA("fetch-data");
+
+        @JsonProperty("type")
+        private String which;
+        IPFSTimeOut(String which) {
+            this.which = which;
+        }
+
+        @JsonCreator
+        public static IPFSTimeOut of(String which) {
+            switch (which) {
+                case "find-provider":
+                    return FIND_PROV;
+                case "find-peer-info":
+                    return FIND_PEER_INFO;
+                case "fetch-data":
+                    return FETCH_DATA;
+                default:
+                    throw new InvalidParameterException("Unknown key for IPFS 
timeout config entry: " + which);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return this.which;
+        }
+    }
+
+    @JsonProperty("groupscan-worker-threads")
+    private final int numWorkerThreads;
+
+    @JsonProperty
+    private final Map<String, FormatPluginConfig> formats;
+
+    @JsonCreator
+    public IPFSStoragePluginConfig(
+        @JsonProperty("host") String host,
+        @JsonProperty("port") int port,
+        @JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf,
+        @JsonProperty("ipfs-timeouts") Map<IPFSTimeOut, Integer> ipfsTimeouts,
+        @JsonProperty("groupscan-worker-threads") int numWorkerThreads,
+        @JsonProperty("formats") Map<String, FormatPluginConfig> formats) {
+        this.host = host;
+        this.port = port;
+        this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1;
+        //TODO Jackson failed to deserialize the ipfsTimeouts map causing NPE
+        if (ipfsTimeouts != null) {

Review comment:
       Actually, upon thinking about this, this is a bit more problematic.  You 
really should only include config options that are your standard types like 
ints, Strings etc.  It is possible to include arrays and more complex objects, 
but they all need to ultimately break down into collections of primitives 
and/or Strings. 

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSSubScan.java
##########
@@ -0,0 +1,182 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import io.ipfs.multihash.Multihash;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/*import org.apache.drill.common.expression.SchemaPath;*/
+
+@JsonTypeName("ipfs-sub-scan")
+public class IPFSSubScan extends AbstractBase implements SubScan {
+  private static int IPFS_SUB_SCAN_VALUE = 19155;

Review comment:
       This actually needs to be in the protobuf file.  Don't worry about this 
until we're ready to commit, but there's a procedure where you add it to the 
`UserBitShared.proto`, then build the protobufs and finally the native c 
version.  I'd recommend waiting until everything is done before worrying about 
that. 

##########
File path: 
contrib/storage-ipfs/src/main/java/org/apache/drill/exec/store/ipfs/IPFSStoragePluginConfig.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2018-2020 Bowen Ding, Yuedong Xu, Liang Wang
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.drill.exec.store.ipfs;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+@JsonTypeName(IPFSStoragePluginConfig.NAME)
+public class IPFSStoragePluginConfig extends StoragePluginConfigBase{
+    static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IPFSStoragePluginConfig.class);
+
+    public static final String NAME = "ipfs";
+
+    private final String host;
+    private final int port;
+
+    @JsonProperty("max-nodes-per-leaf")
+    private final int maxNodesPerLeaf;
+
+    //TODO add more specific timeout configs fot different operations in IPFS,
+    // eg. provider resolution, data read, etc.
+    @JsonProperty("ipfs-timeouts")
+    private final Map<IPFSTimeOut, Integer> ipfsTimeouts;
+
+    @JsonIgnore
+    private static final Map<IPFSTimeOut, Integer> ipfsTimeoutDefaults = 
ImmutableMap.of(
+        IPFSTimeOut.FIND_PROV, 4,
+        IPFSTimeOut.FIND_PEER_INFO, 4,
+        IPFSTimeOut.FETCH_DATA, 6
+    );
+
+    public enum IPFSTimeOut {
+        @JsonProperty("find-provider")
+        FIND_PROV("find-provider"),
+        @JsonProperty("find-peer-info")
+        FIND_PEER_INFO("find-peer-info"),
+        @JsonProperty("fetch-data")
+        FETCH_DATA("fetch-data");
+
+        @JsonProperty("type")
+        private String which;
+        IPFSTimeOut(String which) {
+            this.which = which;
+        }
+
+        @JsonCreator
+        public static IPFSTimeOut of(String which) {
+            switch (which) {
+                case "find-provider":
+                    return FIND_PROV;
+                case "find-peer-info":
+                    return FIND_PEER_INFO;
+                case "fetch-data":
+                    return FETCH_DATA;
+                default:
+                    throw new InvalidParameterException("Unknown key for IPFS 
timeout config entry: " + which);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return this.which;
+        }
+    }
+
+    @JsonProperty("groupscan-worker-threads")
+    private final int numWorkerThreads;
+
+    @JsonProperty
+    private final Map<String, FormatPluginConfig> formats;
+
+    @JsonCreator
+    public IPFSStoragePluginConfig(
+        @JsonProperty("host") String host,
+        @JsonProperty("port") int port,
+        @JsonProperty("max-nodes-per-leaf") int maxNodesPerLeaf,
+        @JsonProperty("ipfs-timeouts") Map<IPFSTimeOut, Integer> ipfsTimeouts,
+        @JsonProperty("groupscan-worker-threads") int numWorkerThreads,
+        @JsonProperty("formats") Map<String, FormatPluginConfig> formats) {
+        this.host = host;
+        this.port = port;
+        this.maxNodesPerLeaf = maxNodesPerLeaf > 0 ? maxNodesPerLeaf : 1;
+        //TODO Jackson failed to deserialize the ipfsTimeouts map causing NPE
+        if (ipfsTimeouts != null) {

Review comment:
       I suspect the reason Jackson won't serialize the `ipfsTimeouts` map is 
that `IPFSTimeOut` is not serializable.  The fix for this is to put a 
`JacksonInject` annotation over this variable, then it won't get serialized.  
   
   Alternatively, if you can create a wrapper class that is serializable and 
use that in the Map, that should work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to