This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d5c3b5ee2c Locate and download Prism binary (#31796)
8d5c3b5ee2c is described below

commit 8d5c3b5ee2c4d1ad62aa09adde7b3cce99b79cd3
Author: Damon <[email protected]>
AuthorDate: Wed Jul 10 15:47:25 2024 -0700

    Locate and download Prism binary (#31796)
    
    * Stage PrismRunner implementation and dependencies
    
    * Locate and download Prism binary
    
    * Sync with head
    
    * Remove redundant check
    
    * Remove sha verification; delete files in test setup
    
    * Remove destination dir; check exists
    
    * Add tests for 404 and tag prefix
---
 .../apache/beam/runners/prism/PrismLocator.java    | 221 +++++++++++++++++++++
 .../beam/runners/prism/PrismLocatorTest.java       | 125 ++++++++++++
 2 files changed, 346 insertions(+)

diff --git 
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
 
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
new file mode 100644
index 00000000000..f32e4d88f42
--- /dev/null
+++ 
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import org.apache.beam.sdk.util.ReleaseInfo;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
+
+/**
+ * Locates a Prism executable based on a user's default operating system and 
architecture
+ * environment or a {@link PrismPipelineOptions#getPrismLocation()} override. 
Handles the download,
+ * unzip, {@link PosixFilePermissions}, as needed. For {@link 
#GITHUB_DOWNLOAD_PREFIX} sources,
+ * additionally performs a SHA512 verification.
+ */
+class PrismLocator {
+  static final String OS_NAME_PROPERTY = "os.name";
+  static final String ARCH_PROPERTY = "os.arch";
+  static final String USER_HOME_PROPERTY = "user.home";
+
+  private static final String ZIP_EXT = "zip";
+  private static final ReleaseInfo RELEASE_INFO = ReleaseInfo.getReleaseInfo();
+  private static final String PRISM_BIN_PATH = ".apache_beam/cache/prism/bin";
+  private static final Set<PosixFilePermission> PERMS =
+      PosixFilePermissions.fromString("rwxr-xr-x");
+  private static final String GITHUB_DOWNLOAD_PREFIX =
+      "https://github.com/apache/beam/releases/download";;
+  private static final String GITHUB_TAG_PREFIX = 
"https://github.com/apache/beam/releases/tag";;
+
+  private final PrismPipelineOptions options;
+
+  PrismLocator(PrismPipelineOptions options) {
+    this.options = options;
+  }
+
+  /**
+   * Downloads and prepares a Prism executable for use with the {@link 
PrismRunner}. The returned
+   * {@link String} is the absolute path to the Prism executable.
+   */
+  String resolve() throws IOException {
+
+    String from =
+        String.format("%s/v%s/%s.zip", GITHUB_DOWNLOAD_PREFIX, 
getSDKVersion(), buildFileName());
+
+    if (!Strings.isNullOrEmpty(options.getPrismLocation())) {
+      checkArgument(
+          !options.getPrismLocation().startsWith(GITHUB_TAG_PREFIX),
+          "Provided --prismLocation URL is not an Apache Beam Github "
+              + "Release page URL or download URL: ",
+          from);
+
+      from = options.getPrismLocation();
+    }
+
+    String fromFileName = getNameWithoutExtension(from);
+    Path to = Paths.get(userHome(), PRISM_BIN_PATH, fromFileName);
+
+    if (Files.exists(to)) {
+      return to.toString();
+    }
+
+    createDirectoryIfNeeded(to);
+
+    if (from.startsWith("http")) {
+      String result = resolve(new URL(from), to);
+      checkState(Files.exists(to), "Resolved location does not exist: %s", 
result);
+      return result;
+    }
+
+    String result = resolve(Paths.get(from), to);
+    checkState(Files.exists(to), "Resolved location does not exist: %s", 
result);
+    return result;
+  }
+
+  static Path prismBinDirectory() {
+    return Paths.get(userHome(), PRISM_BIN_PATH);
+  }
+
+  private String resolve(URL from, Path to) throws IOException {
+    BiConsumer<URL, Path> downloadFn = PrismLocator::download;
+    if (from.getPath().endsWith(ZIP_EXT)) {
+      downloadFn = PrismLocator::unzip;
+    }
+    downloadFn.accept(from, to);
+
+    Files.setPosixFilePermissions(to, PERMS);
+
+    return to.toString();
+  }
+
+  private String resolve(Path from, Path to) throws IOException {
+
+    BiConsumer<InputStream, Path> copyFn = PrismLocator::copy;
+    if (from.endsWith(ZIP_EXT)) {
+      copyFn = PrismLocator::unzip;
+    }
+
+    copyFn.accept(from.toUri().toURL().openStream(), to);
+    ByteStreams.copy(from.toUri().toURL().openStream(), 
Files.newOutputStream(to));
+    Files.setPosixFilePermissions(to, PERMS);
+
+    return to.toString();
+  }
+
+  String buildFileName() {
+    String version = getSDKVersion();
+    return String.format("apache_beam-v%s-prism-%s-%s", version, os(), arch());
+  }
+
+  private static void unzip(URL from, Path to) {
+    try {
+      unzip(from.openStream(), to);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void unzip(InputStream from, Path to) {
+    try (OutputStream out = Files.newOutputStream(to)) {
+      ZipInputStream zis = new ZipInputStream(from);
+      for (ZipEntry entry = zis.getNextEntry(); entry != null; entry = 
zis.getNextEntry()) {
+        InputStream in = ByteStreams.limit(zis, entry.getSize());
+        ByteStreams.copy(in, out);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void copy(InputStream from, Path to) {
+    try {
+      ByteStreams.copy(from, Files.newOutputStream(to));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void download(URL from, Path to) {
+    try {
+      ByteStreams.copy(from.openStream(), Files.newOutputStream(to));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static String getNameWithoutExtension(String path) {
+    return org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files
+        .getNameWithoutExtension(path);
+  }
+
+  private String getSDKVersion() {
+    if (Strings.isNullOrEmpty(options.getPrismVersionOverride())) {
+      return RELEASE_INFO.getSdkVersion();
+    }
+    return options.getPrismVersionOverride();
+  }
+
+  private static String os() {
+    String result = mustGetPropertyAsLowerCase(OS_NAME_PROPERTY);
+    if (result.contains("mac")) {
+      return "darwin";
+    }
+    return result;
+  }
+
+  private static String arch() {
+    String result = mustGetPropertyAsLowerCase(ARCH_PROPERTY);
+    if (result.contains("aarch")) {
+      return "arm64";
+    }
+    return result;
+  }
+
+  private static String userHome() {
+    return mustGetPropertyAsLowerCase(USER_HOME_PROPERTY);
+  }
+
+  private static String mustGetPropertyAsLowerCase(String name) {
+    return checkStateNotNull(System.getProperty(name), "System property: " + 
name + " not set")
+        .toLowerCase();
+  }
+
+  private static void createDirectoryIfNeeded(Path path) throws IOException {
+    Path parent = path.getParent();
+    if (parent == null) {
+      return;
+    }
+    Files.createDirectories(parent);
+  }
+}
diff --git 
a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismLocatorTest.java
 
b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismLocatorTest.java
new file mode 100644
index 00000000000..982a8bfd657
--- /dev/null
+++ 
b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismLocatorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.apache.beam.runners.prism.PrismLocator.prismBinDirectory;
+import static 
org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest;
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PrismLocator}. */
+@RunWith(JUnit4.class)
+public class PrismLocatorTest {
+
+  private static final Path DESTINATION_DIRECTORY = prismBinDirectory();
+
+  @Before
+  public void setup() throws IOException {
+    if (Files.exists(DESTINATION_DIRECTORY)) {
+      Files.walkFileTree(
+          DESTINATION_DIRECTORY,
+          new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs)
+                throws IOException {
+              Files.delete(file);
+              return FileVisitResult.CONTINUE;
+            }
+          });
+
+      Files.delete(DESTINATION_DIRECTORY);
+    }
+  }
+
+  @Test
+  public void givenVersionOverride_thenResolves() throws IOException {
+    assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse();
+    PrismPipelineOptions options = options();
+    options.setPrismVersionOverride("2.57.0");
+    PrismLocator underTest = new PrismLocator(options);
+    String got = underTest.resolve();
+    assertThat(got).contains(DESTINATION_DIRECTORY.toString());
+    assertThat(got).contains("2.57.0");
+    Path gotPath = Paths.get(got);
+    assertThat(Files.exists(gotPath)).isTrue();
+  }
+
+  @Test
+  public void givenHttpPrismLocationOption_thenResolves() throws IOException {
+    assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse();
+    PrismPipelineOptions options = options();
+    options.setPrismLocation(
+        
"https://github.com/apache/beam/releases/download/v2.57.0/apache_beam-v2.57.0-prism-darwin-arm64.zip";);
+    PrismLocator underTest = new PrismLocator(options);
+    String got = underTest.resolve();
+    assertThat(got).contains(DESTINATION_DIRECTORY.toString());
+    Path gotPath = Paths.get(got);
+    assertThat(Files.exists(gotPath)).isTrue();
+  }
+
+  @Test
+  public void givenFilePrismLocationOption_thenResolves() throws IOException {
+    assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse();
+    PrismPipelineOptions options = options();
+    options.setPrismLocation(getLocalPrismBuildOrIgnoreTest());
+    PrismLocator underTest = new PrismLocator(options);
+    String got = underTest.resolve();
+    assertThat(got).contains(DESTINATION_DIRECTORY.toString());
+    Path gotPath = Paths.get(got);
+    assertThat(Files.exists(gotPath)).isTrue();
+  }
+
+  @Test
+  public void givenGithubTagPrismLocationOption_thenThrows() {
+    PrismPipelineOptions options = options();
+    options.setPrismLocation(
+        
"https://github.com/apache/beam/releases/tag/v2.57.0/apache_beam-v2.57.0-prism-darwin-amd64.zip";);
+    PrismLocator underTest = new PrismLocator(options);
+    IllegalArgumentException error =
+        assertThrows(IllegalArgumentException.class, underTest::resolve);
+    assertThat(error.getMessage())
+        .contains(
+            "Provided --prismLocation URL is not an Apache Beam Github Release 
page URL or download URL");
+  }
+
+  @Test
+  public void givenPrismLocation404_thenThrows() {
+    PrismPipelineOptions options = options();
+    options.setPrismLocation("https://example.com/i/dont/exist.zip";);
+    PrismLocator underTest = new PrismLocator(options);
+    RuntimeException error = assertThrows(RuntimeException.class, 
underTest::resolve);
+    assertThat(error.getMessage()).contains("NotFoundException");
+  }
+
+  private static PrismPipelineOptions options() {
+    return PipelineOptionsFactory.create().as(PrismPipelineOptions.class);
+  }
+}

Reply via email to