This is an automated email from the ASF dual-hosted git repository. ihuzenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit d8ed725250111ffa07279b57175c917fa0f3bd06 Author: Arina Ielchiieva <[email protected]> AuthorDate: Tue Jan 28 17:46:49 2020 +0200 DRILL-7549: Fix validation error when querying absent sub folder in embedded mode closes #1963 --- .../common/exceptions/DrillRuntimeException.java | 8 +- .../exec/store/sys/store/LocalPersistentStore.java | 269 +++++++++++++-------- .../exec/store/sys/TestLocalPersistentStore.java | 166 +++++++++++++ 3 files changed, 337 insertions(+), 106 deletions(-) diff --git a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java index b6ced84..c5c7170 100644 --- a/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java +++ b/common/src/main/java/org/apache/drill/common/exceptions/DrillRuntimeException.java @@ -18,7 +18,7 @@ package org.apache.drill.common.exceptions; public class DrillRuntimeException extends RuntimeException { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRuntimeException.class); + private static final long serialVersionUID = -3796081521525479249L; public DrillRuntimeException() { @@ -41,11 +41,11 @@ public class DrillRuntimeException extends RuntimeException { super(cause); } - public static void format(String format, Object...args) { - format(null, format, args); + public static DrillRuntimeException format(String format, Object...args) { + return format(null, format, args); } - public static void format(Throwable cause, String format, Object...args) { + public static DrillRuntimeException format(Throwable cause, String format, Object...args) { throw new DrillRuntimeException(String.format(format, args), cause); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java index 5d9e7dc..e8a4e22 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java @@ -17,46 +17,53 @@ */ package org.apache.drill.exec.store.sys.store; -import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - -import javax.annotation.Nullable; - import org.apache.commons.io.IOUtils; -import org.apache.drill.common.collections.ImmutableEntry; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.exec.store.sys.BasePersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreMode; +import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - -import org.apache.drill.shaded.guava.com.google.common.base.Function; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; -import org.apache.drill.shaded.guava.com.google.common.collect.Iterables; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; + +/** + * Local persistent store stores its data on the given file system. + * Data is stored in the files with key name as a base and + * {@link org.apache.drill.exec.ExecConstants#DRILL_SYS_FILE_SUFFIX} suffix. + * + * @param <V> store data type + */ public class LocalPersistentStore<V> extends BasePersistentStore<V> { + private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class); + private static final PathFilter SYS_FILE_SUFFIX_FILTER = path -> path.getName().endsWith(DRILL_SYS_FILE_SUFFIX); + private final Path basePath; + private final PersistentStoreConfig<V> config; + private final DrillFileSystem fs; public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig<V> config) { @@ -64,25 +71,18 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { this.config = config; this.fs = fs; try { - mkdirs(getBasePath()); + fs.mkdirs(basePath); } catch (IOException e) { - throw new RuntimeException("Failure setting pstore configuration path."); + throw DrillRuntimeException.format(e, "Failure setting local persistent store path [%s]: %s", + basePath, e.getMessage()); } } - protected Path getBasePath() { - return basePath; - } - @Override public PersistentStoreMode getMode() { return PersistentStoreMode.PERSISTENT; } - private void mkdirs(Path path) throws IOException { - fs.mkdirs(path); - } - public static Path getLogDir() { String drillLogDir = System.getenv("DRILL_LOG_DIR"); if (drillLogDir == null) { @@ -109,112 +109,177 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { @Override public Iterator<Map.Entry<String, V>> getRange(int skip, int take) { + List<FileStatus> fileStatuses; try { - // list only files with sys file suffix - PathFilter sysFileSuffixFilter = new PathFilter() { - @Override - public boolean accept(Path path) { - return path.getName().endsWith(DRILL_SYS_FILE_SUFFIX); - } - }; - - List<FileStatus> fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, sysFileSuffixFilter); - if (fileStatuses.isEmpty()) { - return Collections.emptyIterator(); - } - - List<String> files = Lists.newArrayList(); - for (FileStatus stat : fileStatuses) { - String s = stat.getPath().getName(); - files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); - } - - Collections.sort(files); - - return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() { - @Nullable - @Override - public Entry<String, V> apply(String key) { - return new ImmutableEntry<>(key, get(key)); - } - }).iterator(); + fileStatuses = DrillFileSystemUtil.listFiles(fs, basePath, false, SYS_FILE_SUFFIX_FILTER); } catch (IOException e) { - throw new RuntimeException(e); + throw DrillRuntimeException.format(e, "Unable to retrieve store data: %s", e.getMessage()); + } + + if (fileStatuses.isEmpty()) { + return Collections.emptyIterator(); } - } - private Path makePath(String name) { - Preconditions.checkArgument( - !name.contains("/") && - !name.contains(":") && - !name.contains("..")); - return new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); + return fileStatuses.stream() + .map(this::extractKeyName) + .sorted() + .skip(skip) + .limit(take) + .collect(Collectors.toMap( + Function.identity(), + this::get, + (o, n) -> n, + LinkedHashMap::new)) + .entrySet() + .iterator(); } @Override public boolean contains(String key) { - try { - return fs.exists(makePath(key)); - } catch (IOException e) { - throw new RuntimeException(e); - } + Path path = makePath(key, false); + return exists(path); } @Override public V get(String key) { - try { - Path path = makePath(key); - if (!fs.exists(path)) { - return null; - } - } catch (IOException e) { - throw new RuntimeException(e); + Path path = makePath(key, false); + if (!exists(path)) { + return null; } - final Path path = makePath(key); + try (InputStream is = fs.open(path)) { - return config.getSerializer().deserialize(IOUtils.toByteArray(is)); + byte[] bytes = IOUtils.toByteArray(is); + return deserialize(path, bytes); } catch (IOException e) { - throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); + throw DrillRuntimeException.format(e, "Unable to retrieve store data for the path [%s]: %s", + path, e.getMessage()); } } @Override public void put(String key, V value) { - try (OutputStream os = fs.create(makePath(key))) { - IOUtils.write(config.getSerializer().serialize(value), os); - } catch (IOException e) { - throw new RuntimeException(e); - } + Path path = makePath(key, true); + put(path, value); } @Override public boolean putIfAbsent(String key, V value) { - try { - Path p = makePath(key); - if (fs.exists(p)) { - return false; - } else { - try (OutputStream os = fs.create(makePath(key))) { - IOUtils.write(config.getSerializer().serialize(value), os); - } - return true; - } - } catch (IOException e) { - throw new RuntimeException(e); + Path path = makePath(key, true); + if (exists(path)) { + return false; } + + put(path, value); + return true; } @Override public void delete(String key) { + Path path = makePath(key, true); try { - fs.delete(makePath(key), false); + fs.delete(path, false); } catch (IOException e) { - logger.error("Unable to delete data from storage.", e); - throw new RuntimeException(e); + throw DrillRuntimeException.format(e, "Unable to delete store data for the path [%s]: %s", + path, e.getMessage()); } } @Override public void close() { } + + /** + * Checks if given key name is valid. Since store data is persisted on the file system, + * key name must not be null or contain any special characters. + * + * @param key key name + * @return true if key name is valid, false otherwise + */ + private boolean isValidKey(String key) { + return key != null + && !key.isEmpty() + && !key.contains(":") + && !key.contains("..") + && !key.contains("/"); + } + + /** + * Constructs path based on given path name. + * If given key is invalid, will fail only if {@code failOnInvalidKey} is passed as true, + * otherwise will return null value. + * + * @param key key name + * @param failOnInvalidKey flag indicating if exception should be on the invalid key + * @return constructed path relevant to the current store configuration + */ + private Path makePath(String key, boolean failOnInvalidKey) { + if (isValidKey(key)) { + try { + return new Path(basePath, key + DRILL_SYS_FILE_SUFFIX); + } catch (IllegalArgumentException e) { + return handleInvalidKey(key, e, failOnInvalidKey); + } + } else { + return handleInvalidKey(key, null, failOnInvalidKey); + } + } + + private Path handleInvalidKey(String key, Throwable throwable, boolean failOnInvalidKey) { + if (failOnInvalidKey) { + throw DrillRuntimeException.format(throwable, "Illegal storage key name: %s", key); + } else { + logger.debug("Illegal storage key name: {}", key, throwable); + return null; + } + } + + private boolean exists(Path path) { + try { + return path != null && fs.exists(path); + } catch (IOException e) { + throw DrillRuntimeException.format(e, "Unable to check store file [%s] existence: %s", + path, e.getMessage()); + } + } + + private byte[] serialize(Path path, V value) { + try { + return config.getSerializer().serialize(value); + } catch (IOException e) { + throw DrillRuntimeException.format(e, "Unable serialize value for the store key [%s]: %s", + path, e.getMessage()); + } + } + + private V deserialize(Path path, byte[] bytes) { + try { + return config.getSerializer().deserialize(bytes); + } catch (IOException e) { + throw DrillRuntimeException.format(e, "Unable deserialize value for the path [%s]: %s", + path, e.getMessage()); + } + } + + private void put(Path path, V value) { + try (OutputStream os = fs.create(path)) { + IOUtils.write(serialize(path, value), os); + } catch (IOException e) { + throw DrillRuntimeException.format(e, "Unable to store data for the path [%s]: %s", + path, e.getMessage()); + } + } + + /** + * Extracts key name from file status. + * Key name is base of the file name where key data is stored. + * {@link org.apache.drill.exec.ExecConstants#DRILL_SYS_FILE_SUFFIX} + * should be removed from the file name to obtain key name. + * + * @param fileStatus file status + * @return key name + */ + private String extractKeyName(FileStatus fileStatus) { + String name = fileStatus.getPath().getName(); + return name.substring(0, name.length() - DRILL_SYS_FILE_SUFFIX.length()); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestLocalPersistentStore.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestLocalPersistentStore.java new file mode 100644 index 0000000..6e773db --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/sys/TestLocalPersistentStore.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.drill.categories.UnlikelyTest; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.sys.store.LocalPersistentStore; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.drill.test.BaseTest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({UnlikelyTest.class}) +public class TestLocalPersistentStore extends BaseTest { + + @Rule + public TemporaryFolder root = new TemporaryFolder(); + + private static final PersistentStoreConfig<String> DEFAULT_STORE_CONFIG = PersistentStoreConfig + .newJacksonBuilder(new ObjectMapper(), String.class) + .name("local-test-store") + .build(); + + private static final List<String> ILLEGAL_KEYS = Arrays.asList( + null, "", "/abc", "a/b/c", "abc/", "C:\\abc", "../abc", ".."); + + private static DrillFileSystem fs; + + @BeforeClass + public static void init() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(FileSystem.FS_DEFAULT_NAME_KEY, FileSystem.DEFAULT_FS); + fs = new DrillFileSystem(configuration); + } + + @Test + public void testAbsentGet() throws Exception { + Path path = new Path(root.newFolder("absent-get").toURI().getPath()); + LocalPersistentStore<String> store = new LocalPersistentStore<>(fs, path, DEFAULT_STORE_CONFIG); + + assertNull(store.get("abc")); + + ILLEGAL_KEYS.stream() + .map(store::get) + .forEach(Assert::assertNull); + } + + @Test + public void testContains() throws Exception { + Path path = new Path(root.newFolder("contains").toURI().getPath()); + LocalPersistentStore<String> store = new LocalPersistentStore<>(fs, path, DEFAULT_STORE_CONFIG); + store.put("abc", "desc"); + + ILLEGAL_KEYS.stream() + .map(store::contains) + .forEach(Assert::assertFalse); + + assertFalse(store.contains("a")); + assertTrue(store.contains("abc")); + } + + @Test + public void testPutAndGet() throws Exception { + Path path = new Path(root.newFolder("put-and-get").toURI().getPath()); + LocalPersistentStore<String> store = new LocalPersistentStore<>(fs, path, DEFAULT_STORE_CONFIG); + + store.put("abc", "desc"); + assertEquals("desc", store.get("abc")); + + store.put("abc", "new-desc"); + assertEquals("new-desc", store.get("abc")); + } + + @Test + public void testIllegalPut() throws Exception { + Path path = new Path(root.newFolder("illegal-put").toURI().getPath()); + LocalPersistentStore<String> store = new LocalPersistentStore<>(fs, path, DEFAULT_STORE_CONFIG); + + ILLEGAL_KEYS.forEach(key -> { + try { + store.put(key, "desc"); + fail(String.format("Key [%s] should be illegal, put in the store should have failed", key)); + } catch (DrillRuntimeException e) { + assertTrue(e.getMessage().startsWith("Illegal storage key name")); + } + }); + } + + @Test + public void testPutIfAbsent() throws Exception { + Path path = new Path(root.newFolder("put-if-absent").toURI().getPath()); + LocalPersistentStore<String> store = new LocalPersistentStore<>(fs, path, DEFAULT_STORE_CONFIG); + + assertTrue(store.putIfAbsent("abc", "desc")); + assertFalse(store.putIfAbsent("abc", "new-desc")); + assertEquals("desc", store.get("abc")); + } + + @Test + public void testIllegalPutIfAbsent() throws Exception { + Path path = new Path(root.newFolder("illegal-put-if-absent").toURI().getPath()); + LocalPersistentStore<String> store = new LocalPersistentStore<>(fs, path, DEFAULT_STORE_CONFIG); + + ILLEGAL_KEYS.forEach(key -> { + try { + store.putIfAbsent(key, "desc"); + fail(String.format("Key [%s] should be illegal, putIfAbsent in the store should have failed", key)); + } catch (DrillRuntimeException e) { + assertTrue(e.getMessage().startsWith("Illegal storage key name")); + } + }); + } + + @Test + public void testRange() throws Exception { + Path path = new Path(root.newFolder("range").toURI().getPath()); + LocalPersistentStore<String> store = new LocalPersistentStore<>(fs, path, DEFAULT_STORE_CONFIG); + + assertEquals(0, Lists.newArrayList(store.getRange(0, 10)).size()); + + IntStream.range(0, 10) + .forEach(i -> store.put("key_" + i, "value_" + i)); + + assertEquals(10, Lists.newArrayList(store.getRange(0, 20)).size()); + assertEquals(10, Lists.newArrayList(store.getRange(0, 10)).size()); + assertEquals(9, Lists.newArrayList(store.getRange(0, 9)).size()); + assertEquals(0, Lists.newArrayList(store.getRange(10, 2)).size()); + assertEquals(5, Lists.newArrayList(store.getRange(2, 5)).size()); + assertEquals(0, Lists.newArrayList(store.getRange(0, 0)).size()); + assertEquals(0, Lists.newArrayList(store.getRange(4, 0)).size()); + } +}
