http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java index a597381..723dbb0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStore.java @@ -17,23 +17,13 @@ */ package org.apache.drill.exec.store.sys.zk; -import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; - -import java.io.FileNotFoundException; import java.io.IOException; -import org.apache.commons.io.IOUtils; import org.apache.curator.framework.CuratorFramework; -import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; -import org.apache.drill.exec.store.dfs.shim.DrillInputStream; -import org.apache.drill.exec.store.dfs.shim.DrillOutputStream; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.hadoop.fs.Path; import org.apache.zookeeper.CreateMode; -import com.google.common.base.Preconditions; - /** * Implementation of PStore using Zookeeper's PERSISTENT node. * @param <V>
http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java index f8fa2bc..f0fa120 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/zk/ZkPStoreProvider.java @@ -24,7 +24,7 @@ import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; import org.apache.drill.exec.exception.DrillbitStartupException; -import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; +import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.sys.EStoreProvider; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreConfig; http://git-wip-us.apache.org/repos/asf/drill/blob/c54bd6ac/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java new file mode 100644 index 0000000..5c71d08 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.dfs; + +import org.apache.drill.exec.ops.OpProfileDef; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.InputStream; +import java.io.PrintWriter; + +import static org.junit.Assert.assertTrue; + +public class TestDrillFileSystem { + + private static String tempFilePath; + + @BeforeClass + public static void createTempFile() throws Exception { + + File tempFile; + while (true) { + tempFile = File.createTempFile("drillFSTest", ".txt"); + if (tempFile.exists()) { + boolean success = tempFile.delete(); + if (success) { + break; + } + } + } + + // Write some data + PrintWriter printWriter = new PrintWriter(tempFile); + for (int i=1; i<=200000; i++) { + printWriter.println (String.format("%d, key_%d", i, i)); + } + printWriter.close(); + + tempFilePath = tempFile.getPath(); + } + + @Test + public void testIOStats() throws Exception { + Configuration conf = new Configuration(); + conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); + OpProfileDef profileDef = new OpProfileDef(0 /*operatorId*/, 0 /*operatorType*/, 0 /*inputCount*/); + OperatorStats stats = new OperatorStats(profileDef, null /*allocator*/); + // start wait time method in OperatorStats expects the OperatorStats state to be in "processing" + stats.startProcessing(); + DrillFileSystem dfs = new DrillFileSystem(FileSystem.get(conf), stats); + + InputStream is = dfs.open(new Path(tempFilePath)); + + byte[] buf = new byte[8000]; + while (is.read(buf, 0, buf.length) != -1) { } + + stats.stopProcessing(); + + OperatorProfile operatorProfile = stats.getProfile(); + + assertTrue("Expected wait time is non-zero, but got zero wait time", operatorProfile.getWaitNanos() > 0); + } + + @AfterClass + public static void deleteTempFile() throws Exception { + new File(tempFilePath).delete(); + } + +}