Repository: beam
Updated Branches:
  refs/heads/master 49809d1d4 -> bea101a44


[BEAM-59] Beam FileSystem: match() and its local implementation.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d1648c47
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d1648c47
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d1648c47

Branch: refs/heads/master
Commit: d1648c47dd4fef00273ceb46d42d784325b3b1e8
Parents: 49809d1
Author: Pei He <pe...@google.com>
Authored: Fri Feb 10 21:53:31 2017 -0800
Committer: Pei He <pe...@google.com>
Committed: Mon Feb 13 22:08:50 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/FileSystem.java |  30 ++++
 .../org/apache/beam/sdk/io/LocalFileSystem.java |  74 ++++++++++
 .../org/apache/beam/sdk/io/fs/MatchResult.java  | 125 ++++++++++++++++
 .../apache/beam/sdk/io/LocalFileSystemTest.java | 148 +++++++++++++++++++
 .../beam/sdk/util/FileIOChannelFactoryTest.java |  13 +-
 .../beam/sdk/io/gcp/storage/GcsFileSystem.java  |   6 +
 .../beam/sdk/io/hdfs/HadoopFileSystem.java      |   6 +
 7 files changed, 391 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
index ecfa29b..001f596 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java
@@ -24,6 +24,7 @@ import java.nio.channels.WritableByteChannel;
 import java.util.Collection;
 import java.util.List;
 import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.ResourceId;
 
 /**
@@ -35,6 +36,35 @@ import org.apache.beam.sdk.io.fs.ResourceId;
  * Clients should use {@link FileSystems} utility.
  */
 public abstract class FileSystem<ResourceIdT extends ResourceId> {
+  /**
+   * This is the entry point to convert user-provided specs to {@link 
ResourceIdT ResourceIds}.
+   * Callers should use {@link #match} to resolve users specs ambiguities 
before
+   * calling other methods.
+   *
+   * <p>Implementation should handle the following ambiguities of a 
user-provided spec:
+   * <ol>
+   * <li>{@code spec} could be a glob or a uri. {@link #match} should be able 
to tell and
+   * choose efficient implementations.
+   * <li>The user-provided {@code spec} might refer to files or directories. 
It is common that
+   * users that wish to indicate a directory will omit the trailing {@code /}, 
such as in a spec of
+   * {@code "/tmp/dir"}. The {@link FileSystem} should be able to recognize a 
directory with
+   * the trailing {@code /} omitted, but should always return a correct {@link 
ResourceIdT}
+   * (e.g., {@code "/tmp/dir/"} inside the returned {@link MatchResult}.
+   * </ol>
+   *
+   * <p>All {@link FileSystem} implementations should support glob in the 
final hierarchical path
+   * component of {@link ResourceIdT}. This allows SDK libraries to construct 
file system agnostic
+   * spec. {@link FileSystem FileSystems} can support additional patterns for 
user-provided specs.
+   *
+   * @return {@code List<MatchResult>} in the same order of the input specs.
+   *
+   * @throws IllegalArgumentException if specs are invalid.
+   * @throws IOException if all specs failed to match due to issues like:
+   * network connection, authorization.
+   * Exception for individual spec need to be deferred until callers retrieve
+   * metadata with {@link MatchResult#metadata()}.
+   */
+  protected abstract List<MatchResult> match(List<String> specs) throws 
IOException;
 
   /**
    * Returns a write channel for the given {@link ResourceIdT}.

http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index 0e79c9c..fe6b643 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -19,6 +19,11 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -29,10 +34,16 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.Collection;
 import java.util.List;
+import java.util.regex.Matcher;
 import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,10 +54,21 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(LocalFileSystem.class);
 
+  private static final Metadata[] EMPTY_METADATA = new Metadata[0];
+
   LocalFileSystem() {
   }
 
   @Override
+  protected List<MatchResult> match(List<String> specs) throws IOException {
+    ImmutableList.Builder<MatchResult> ret = ImmutableList.builder();
+    for (String spec : specs) {
+      ret.add(matchOne(spec));
+    }
+    return ret.build();
+  }
+
+  @Override
   protected WritableByteChannel create(LocalResourceId resourceId, 
CreateOptions createOptions)
       throws IOException {
     LOG.debug("creating file {}", resourceId);
@@ -143,4 +165,56 @@ class LocalFileSystem extends FileSystem<LocalResourceId> {
       }
     }
   }
+
+  private MatchResult matchOne(String spec) throws IOException {
+    File file = Paths.get(spec).toFile();
+
+    if (file.exists()) {
+      return MatchResult.create(Status.OK, new Metadata[]{toMetadata(file)});
+    }
+
+    File parent = file.getAbsoluteFile().getParentFile();
+    if (!parent.exists()) {
+      return MatchResult.create(Status.NOT_FOUND, EMPTY_METADATA);
+    }
+
+    // Method getAbsolutePath() on Windows platform may return something like
+    // "c:\temp\file.txt". FileSystem.getPathMatcher() call below will treat
+    // '\' (backslash) as an escape character, instead of a directory
+    // separator. Replacing backslash with double-backslash solves the problem.
+    // We perform the replacement on all platforms, even those that allow
+    // backslash as a part of the filename, because Globs.toRegexPattern will
+    // eat one backslash.
+    String pathToMatch = 
file.getAbsolutePath().replaceAll(Matcher.quoteReplacement("\\"),
+        Matcher.quoteReplacement("\\\\"));
+
+    final PathMatcher matcher =
+        java.nio.file.FileSystems.getDefault().getPathMatcher("glob:" + 
pathToMatch);
+
+    // TODO: Avoid iterating all files: 
https://issues.apache.org/jira/browse/BEAM-1309
+    Iterable<File> files = 
com.google.common.io.Files.fileTreeTraverser().preOrderTraversal(parent);
+    Iterable<File> matchedFiles = Iterables.filter(files,
+        Predicates.and(
+            com.google.common.io.Files.isFile(),
+            new Predicate<File>() {
+              @Override
+              public boolean apply(File input) {
+                return matcher.matches(input.toPath());
+              }
+            }));
+
+    List<Metadata> result = Lists.newLinkedList();
+    for (File match : matchedFiles) {
+      result.add(toMetadata(match));
+    }
+    return MatchResult.create(Status.OK, result.toArray(new 
Metadata[result.size()]));
+  }
+
+  private Metadata toMetadata(File file) {
+    return Metadata.builder()
+        .setResourceId(LocalResourceId.fromPath(file.toPath(), 
file.isDirectory()))
+        .setIsReadSeekEfficient(true)
+        .setSizeBytes(file.length())
+        .build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
new file mode 100644
index 0000000..80ee00f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+
+/**
+ * The result of {@link org.apache.beam.sdk.io.FileSystem#match}.
+ */
+public abstract class MatchResult {
+
+  private MatchResult() {}
+
+  /**
+   * Returns a {@link MatchResult} given the {@link Status} and {@link 
Metadata}.
+   */
+  public static MatchResult create(final Status status, final Metadata[] 
metadata) {
+    return new MatchResult() {
+      @Override
+      public Status status() {
+        return status;
+      }
+
+      @Override
+      public Metadata[] metadata() throws IOException {
+        return metadata;
+      }
+    };
+  }
+
+  /**
+   * Returns a {@link MatchResult} given the {@link Status} and {@link 
IOException}.
+   */
+  public static MatchResult create(final Status status, final IOException e) {
+    return new MatchResult() {
+      @Override
+      public Status status() {
+        return status;
+      }
+
+      @Override
+      public Metadata[] metadata() throws IOException {
+        throw e;
+      }
+    };
+  }
+
+  /**
+   * Returns a {@link MatchResult} with {@link Status#UNKNOWN}.
+   */
+  public static MatchResult unknown() {
+    return new MatchResult() {
+      @Override
+      public Status status() {
+        return Status.UNKNOWN;
+      }
+
+      @Override
+      public Metadata[] metadata() throws IOException {
+        throw new IOException("MatchResult status is UNKNOWN, and metadata is 
not available.");
+      }
+    };
+  }
+
+  /**
+   * Status of the {@link MatchResult}.
+   */
+  public abstract Status status();
+
+  /**
+   * {@link Metadata} of matched files.
+   */
+  public abstract Metadata[] metadata() throws IOException;
+
+  /**
+   * {@link Metadata} of a matched file.
+   */
+  @AutoValue
+  public abstract static class Metadata {
+    public abstract ResourceId resourceId();
+    public abstract long sizeBytes();
+    public abstract boolean isReadSeekEfficient();
+
+    public static Builder builder() {
+      return new AutoValue_MatchResult_Metadata.Builder();
+    }
+
+    /**
+     * Builder class for {@link Metadata}.
+     */
+    @AutoValue.Builder
+    public abstract static class Builder {
+      public abstract Builder setResourceId(ResourceId value);
+      public abstract Builder setSizeBytes(long value);
+      public abstract Builder setIsReadSeekEfficient(boolean value);
+      public abstract Metadata build();
+    }
+  }
+
+  /**
+   * Status of a {@link MatchResult}.
+   */
+  public enum Status {
+    UNKNOWN,
+    OK,
+    NOT_FOUND,
+    ERROR,
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
index ad9b8a0..74f8b72 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
@@ -30,15 +30,21 @@ import com.google.common.io.Files;
 import com.google.common.io.LineReader;
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.io.Reader;
 import java.io.Writer;
 import java.nio.channels.Channels;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.util.MimeTypes;
+import org.hamcrest.Matchers;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -147,6 +153,122 @@ public class LocalFileSystemTest {
     }
   }
 
+  @Test
+  public void testMatchExact() throws Exception {
+    List<String> expected = 
ImmutableList.of(temporaryFolder.newFile("a").toString());
+    temporaryFolder.newFile("aa");
+    temporaryFolder.newFile("ab");
+
+    List<MatchResult> matchResults = localFileSystem.match(
+        
ImmutableList.of(temporaryFolder.getRoot().toPath().resolve("a").toString()));
+    assertThat(
+        toFilenames(matchResults),
+        containsInAnyOrder(expected.toArray(new String[expected.size()])));
+  }
+
+  @Test
+  public void testMatchPatternNone() throws Exception {
+    List<String> expected = ImmutableList.of();
+    temporaryFolder.newFile("a");
+    temporaryFolder.newFile("aa");
+    temporaryFolder.newFile("ab");
+
+    List<MatchResult> matchResults =
+        
matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("b"), "*");
+    assertThat(
+        toFilenames(matchResults),
+        containsInAnyOrder(expected.toArray(new String[expected.size()])));
+  }
+
+  @Test
+  public void testMatchForNonExistentFile() throws Exception {
+    List<String> expected = ImmutableList.of();
+    temporaryFolder.newFile("aa");
+
+    List<MatchResult> matchResults = localFileSystem.match(
+        
ImmutableList.of(temporaryFolder.getRoot().toPath().resolve("a").toString()));
+    assertThat(
+        toFilenames(matchResults),
+        containsInAnyOrder(expected.toArray(new String[expected.size()])));
+  }
+
+  @Test
+  public void testMatchMultipleWithFileExtension() throws Exception {
+    List<String> expected = ImmutableList.of(
+        temporaryFolder.newFile("a.txt").toString(),
+        temporaryFolder.newFile("aa.txt").toString(),
+        temporaryFolder.newFile("ab.txt").toString());
+    temporaryFolder.newFile("a.avro");
+    temporaryFolder.newFile("ab.avro");
+
+    List<MatchResult> matchResults =
+        
matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("a"), 
"*.txt");
+    assertThat(
+        toFilenames(matchResults),
+        containsInAnyOrder(expected.toArray(new String[expected.size()])));
+  }
+
+  @Test
+  public void testMatchMultipleWithoutSubdirectoryExpansion() throws Exception 
{
+    File unmatchedSubDir = temporaryFolder.newFolder("aaa");
+    File unmatchedSubDirFile = File.createTempFile("sub-dir-file", "", 
unmatchedSubDir);
+    unmatchedSubDirFile.deleteOnExit();
+    List<String> expected = 
ImmutableList.of(temporaryFolder.newFile("a").toString(),
+        temporaryFolder.newFile("aa").toString(), 
temporaryFolder.newFile("ab").toString());
+    temporaryFolder.newFile("ba");
+    temporaryFolder.newFile("bb");
+
+    List<MatchResult> matchResults =
+        
matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("a"), "*");
+    assertThat(
+        toFilenames(matchResults),
+        containsInAnyOrder(expected.toArray(new String[expected.size()])));
+  }
+
+  @Test
+  public void testMatchMultipleWithSubdirectoryExpansion() throws Exception {
+    File matchedSubDir = temporaryFolder.newFolder("a");
+    File matchedSubDirFile = File.createTempFile("sub-dir-file", "", 
matchedSubDir);
+    matchedSubDirFile.deleteOnExit();
+    File unmatchedSubDir = temporaryFolder.newFolder("b");
+    File unmatchedSubDirFile = File.createTempFile("sub-dir-file", "", 
unmatchedSubDir);
+    unmatchedSubDirFile.deleteOnExit();
+
+    List<String> expected = ImmutableList.of(matchedSubDirFile.toString(),
+        temporaryFolder.newFile("aa").toString(), 
temporaryFolder.newFile("ab").toString());
+    temporaryFolder.newFile("ba");
+    temporaryFolder.newFile("bb");
+
+    List<MatchResult> matchResults =
+        
matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("a"), "**");
+    assertThat(
+        toFilenames(matchResults),
+        Matchers.hasItems(expected.toArray(new String[expected.size()])));
+  }
+
+  @Test
+  public void testMatchWithDirectoryFiltersOutDirectory() throws Exception {
+    List<String> expected = 
ImmutableList.of(temporaryFolder.newFile("a").toString());
+    temporaryFolder.newFolder("a_dir_that_should_not_be_matched");
+
+    List<MatchResult> matchResults =
+        
matchGlobWithPathPrefix(temporaryFolder.getRoot().toPath().resolve("a"), "*");
+    assertThat(
+        toFilenames(matchResults),
+        containsInAnyOrder(expected.toArray(new String[expected.size()])));
+  }
+
+  @Test
+  public void testMatchWithoutParentDirectory() throws Exception {
+    Path pattern = LocalResourceId
+        .fromPath(temporaryFolder.getRoot().toPath(), true /* isDirectory */)
+        .resolve("non_existing_dir", StandardResolveOptions.RESOLVE_DIRECTORY)
+        .resolve("*", StandardResolveOptions.RESOLVE_FILE)
+        .getPath();
+    assertTrue(
+        
toFilenames(localFileSystem.match(ImmutableList.of(pattern.toString()))).isEmpty());
+  }
+
   private void createFileWithContent(Path path, String content) throws 
Exception {
     try (Writer writer = Channels.newWriter(
         localFileSystem.create(
@@ -157,6 +279,12 @@ public class LocalFileSystemTest {
     }
   }
 
+  private List<MatchResult> matchGlobWithPathPrefix(Path pathPrefix, String 
glob)
+      throws IOException {
+    // Windows doesn't like resolving paths with * in glob, so the glob is 
concatenated as String.
+    return localFileSystem.match(ImmutableList.of(pathPrefix + glob));
+  }
+
   private List<LocalResourceId> toLocalResourceIds(List<Path> paths, final 
boolean isDirectory) {
     return FluentIterable
         .from(paths)
@@ -167,4 +295,24 @@ public class LocalFileSystemTest {
           }})
         .toList();
   }
+
+  private List<String> toFilenames(List<MatchResult> matchResults) {
+    return FluentIterable
+        .from(matchResults)
+        .transformAndConcat(new Function<MatchResult, Iterable<Metadata>>() {
+          @Override
+          public Iterable<Metadata> apply(MatchResult matchResult) {
+            try {
+              return Arrays.asList(matchResult.metadata());
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }})
+        .transform(new Function<Metadata, String>() {
+          @Override
+          public String apply(Metadata metadata) {
+            return ((LocalResourceId) 
metadata.resourceId()).getPath().toString();
+          }})
+        .toList();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
index 38be65a..6062619 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FileIOChannelFactoryTest.java
@@ -130,7 +130,7 @@ public class FileIOChannelFactoryTest {
   }
 
   @Test
-  public void testMatchNone() throws Exception {
+  public void testMatchPatternNone() throws Exception {
     List<String> expected = ImmutableList.of();
     temporaryFolder.newFile("a");
     temporaryFolder.newFile("aa");
@@ -142,16 +142,7 @@ public class FileIOChannelFactoryTest {
   }
 
   @Test
-  public void testMatchUsingExplicitPath() throws Exception {
-    List<String> expected = 
ImmutableList.of(temporaryFolder.newFile("a").toString());
-    temporaryFolder.newFile("aa");
-
-    
assertThat(factory.match(factory.resolve(temporaryFolder.getRoot().getPath(), 
"a")),
-        containsInAnyOrder(expected.toArray(new String[expected.size()])));
-  }
-
-  @Test
-  public void testMatchUsingExplicitPathForNonExistentFile() throws Exception {
+  public void testMatchForNonExistentFile() throws Exception {
     List<String> expected = ImmutableList.of();
     temporaryFolder.newFile("aa");
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
index ce8e7e8..16c4f93 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/storage/GcsFileSystem.java
@@ -28,6 +28,7 @@ import java.util.Collection;
 import java.util.List;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.options.GcsOptions;
 
 /**
@@ -41,6 +42,11 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
   }
 
   @Override
+  protected List<MatchResult> match(List<String> specs) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   protected WritableByteChannel create(GcsResourceId resourceId, CreateOptions 
createOptions)
       throws IOException {
     return options.getGcsUtil().create(resourceId.getGcsPath(), 
createOptions.mimeType());

http://git-wip-us.apache.org/repos/asf/beam/blob/d1648c47/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index 25381b8..f4e35ac 100644
--- 
a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ 
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.List;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.fs.CreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
 
 /**
  * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as
@@ -34,6 +35,11 @@ class HadoopFileSystem extends FileSystem<HadoopResourceId> {
   HadoopFileSystem() {}
 
   @Override
+  protected List<MatchResult> match(List<String> specs) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   protected WritableByteChannel create(HadoopResourceId resourceId, 
CreateOptions createOptions)
       throws IOException {
     throw new UnsupportedOperationException();

Reply via email to