This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8d5c3b5ee2c Locate and download Prism binary (#31796)
8d5c3b5ee2c is described below
commit 8d5c3b5ee2c4d1ad62aa09adde7b3cce99b79cd3
Author: Damon <[email protected]>
AuthorDate: Wed Jul 10 15:47:25 2024 -0700
Locate and download Prism binary (#31796)
* Stage PrismRunner implementation and dependencies
* Locate and download Prism binary
* Sync with head
* Remove redundant check
* Remove sha verification; delete files in test setup
* Remove destination dir; check exists
* Add tests for 404 and tag prefix
---
.../apache/beam/runners/prism/PrismLocator.java | 221 +++++++++++++++++++++
.../beam/runners/prism/PrismLocatorTest.java | 125 ++++++++++++
2 files changed, 346 insertions(+)
diff --git
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
new file mode 100644
index 00000000000..f32e4d88f42
--- /dev/null
+++
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismLocator.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.PosixFilePermission;
+import java.nio.file.attribute.PosixFilePermissions;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+import org.apache.beam.sdk.util.ReleaseInfo;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
+
+/**
+ * Locates a Prism executable based on a user's default operating system and
architecture
+ * environment or a {@link PrismPipelineOptions#getPrismLocation()} override.
Handles the download,
+ * unzip, {@link PosixFilePermissions}, as needed. For {@link
#GITHUB_DOWNLOAD_PREFIX} sources,
+ * additionally performs a SHA512 verification.
+ */
+class PrismLocator {
+ static final String OS_NAME_PROPERTY = "os.name";
+ static final String ARCH_PROPERTY = "os.arch";
+ static final String USER_HOME_PROPERTY = "user.home";
+
+ private static final String ZIP_EXT = "zip";
+ private static final ReleaseInfo RELEASE_INFO = ReleaseInfo.getReleaseInfo();
+ private static final String PRISM_BIN_PATH = ".apache_beam/cache/prism/bin";
+ private static final Set<PosixFilePermission> PERMS =
+ PosixFilePermissions.fromString("rwxr-xr-x");
+ private static final String GITHUB_DOWNLOAD_PREFIX =
+ "https://github.com/apache/beam/releases/download";
+ private static final String GITHUB_TAG_PREFIX =
"https://github.com/apache/beam/releases/tag";
+
+ private final PrismPipelineOptions options;
+
+ PrismLocator(PrismPipelineOptions options) {
+ this.options = options;
+ }
+
+ /**
+ * Downloads and prepares a Prism executable for use with the {@link
PrismRunner}. The returned
+ * {@link String} is the absolute path to the Prism executable.
+ */
+ String resolve() throws IOException {
+
+ String from =
+ String.format("%s/v%s/%s.zip", GITHUB_DOWNLOAD_PREFIX,
getSDKVersion(), buildFileName());
+
+ if (!Strings.isNullOrEmpty(options.getPrismLocation())) {
+ checkArgument(
+ !options.getPrismLocation().startsWith(GITHUB_TAG_PREFIX),
+ "Provided --prismLocation URL is not an Apache Beam Github "
+ + "Release page URL or download URL: ",
+ from);
+
+ from = options.getPrismLocation();
+ }
+
+ String fromFileName = getNameWithoutExtension(from);
+ Path to = Paths.get(userHome(), PRISM_BIN_PATH, fromFileName);
+
+ if (Files.exists(to)) {
+ return to.toString();
+ }
+
+ createDirectoryIfNeeded(to);
+
+ if (from.startsWith("http")) {
+ String result = resolve(new URL(from), to);
+ checkState(Files.exists(to), "Resolved location does not exist: %s",
result);
+ return result;
+ }
+
+ String result = resolve(Paths.get(from), to);
+ checkState(Files.exists(to), "Resolved location does not exist: %s",
result);
+ return result;
+ }
+
+ static Path prismBinDirectory() {
+ return Paths.get(userHome(), PRISM_BIN_PATH);
+ }
+
+ private String resolve(URL from, Path to) throws IOException {
+ BiConsumer<URL, Path> downloadFn = PrismLocator::download;
+ if (from.getPath().endsWith(ZIP_EXT)) {
+ downloadFn = PrismLocator::unzip;
+ }
+ downloadFn.accept(from, to);
+
+ Files.setPosixFilePermissions(to, PERMS);
+
+ return to.toString();
+ }
+
+ private String resolve(Path from, Path to) throws IOException {
+
+ BiConsumer<InputStream, Path> copyFn = PrismLocator::copy;
+ if (from.endsWith(ZIP_EXT)) {
+ copyFn = PrismLocator::unzip;
+ }
+
+ copyFn.accept(from.toUri().toURL().openStream(), to);
+ ByteStreams.copy(from.toUri().toURL().openStream(),
Files.newOutputStream(to));
+ Files.setPosixFilePermissions(to, PERMS);
+
+ return to.toString();
+ }
+
+ String buildFileName() {
+ String version = getSDKVersion();
+ return String.format("apache_beam-v%s-prism-%s-%s", version, os(), arch());
+ }
+
+ private static void unzip(URL from, Path to) {
+ try {
+ unzip(from.openStream(), to);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void unzip(InputStream from, Path to) {
+ try (OutputStream out = Files.newOutputStream(to)) {
+ ZipInputStream zis = new ZipInputStream(from);
+ for (ZipEntry entry = zis.getNextEntry(); entry != null; entry =
zis.getNextEntry()) {
+ InputStream in = ByteStreams.limit(zis, entry.getSize());
+ ByteStreams.copy(in, out);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void copy(InputStream from, Path to) {
+ try {
+ ByteStreams.copy(from, Files.newOutputStream(to));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void download(URL from, Path to) {
+ try {
+ ByteStreams.copy(from.openStream(), Files.newOutputStream(to));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static String getNameWithoutExtension(String path) {
+ return org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files
+ .getNameWithoutExtension(path);
+ }
+
+ private String getSDKVersion() {
+ if (Strings.isNullOrEmpty(options.getPrismVersionOverride())) {
+ return RELEASE_INFO.getSdkVersion();
+ }
+ return options.getPrismVersionOverride();
+ }
+
+ private static String os() {
+ String result = mustGetPropertyAsLowerCase(OS_NAME_PROPERTY);
+ if (result.contains("mac")) {
+ return "darwin";
+ }
+ return result;
+ }
+
+ private static String arch() {
+ String result = mustGetPropertyAsLowerCase(ARCH_PROPERTY);
+ if (result.contains("aarch")) {
+ return "arm64";
+ }
+ return result;
+ }
+
+ private static String userHome() {
+ return mustGetPropertyAsLowerCase(USER_HOME_PROPERTY);
+ }
+
+ private static String mustGetPropertyAsLowerCase(String name) {
+ return checkStateNotNull(System.getProperty(name), "System property: " +
name + " not set")
+ .toLowerCase();
+ }
+
+ private static void createDirectoryIfNeeded(Path path) throws IOException {
+ Path parent = path.getParent();
+ if (parent == null) {
+ return;
+ }
+ Files.createDirectories(parent);
+ }
+}
diff --git
a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismLocatorTest.java
b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismLocatorTest.java
new file mode 100644
index 00000000000..982a8bfd657
--- /dev/null
+++
b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismLocatorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.apache.beam.runners.prism.PrismLocator.prismBinDirectory;
+import static
org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest;
+import static org.junit.Assert.assertThrows;
+
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PrismLocator}. */
+@RunWith(JUnit4.class)
+public class PrismLocatorTest {
+
+ private static final Path DESTINATION_DIRECTORY = prismBinDirectory();
+
+ @Before
+ public void setup() throws IOException {
+ if (Files.exists(DESTINATION_DIRECTORY)) {
+ Files.walkFileTree(
+ DESTINATION_DIRECTORY,
+ new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes
attrs)
+ throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+
+ Files.delete(DESTINATION_DIRECTORY);
+ }
+ }
+
+ @Test
+ public void givenVersionOverride_thenResolves() throws IOException {
+ assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse();
+ PrismPipelineOptions options = options();
+ options.setPrismVersionOverride("2.57.0");
+ PrismLocator underTest = new PrismLocator(options);
+ String got = underTest.resolve();
+ assertThat(got).contains(DESTINATION_DIRECTORY.toString());
+ assertThat(got).contains("2.57.0");
+ Path gotPath = Paths.get(got);
+ assertThat(Files.exists(gotPath)).isTrue();
+ }
+
+ @Test
+ public void givenHttpPrismLocationOption_thenResolves() throws IOException {
+ assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse();
+ PrismPipelineOptions options = options();
+ options.setPrismLocation(
+
"https://github.com/apache/beam/releases/download/v2.57.0/apache_beam-v2.57.0-prism-darwin-arm64.zip");
+ PrismLocator underTest = new PrismLocator(options);
+ String got = underTest.resolve();
+ assertThat(got).contains(DESTINATION_DIRECTORY.toString());
+ Path gotPath = Paths.get(got);
+ assertThat(Files.exists(gotPath)).isTrue();
+ }
+
+ @Test
+ public void givenFilePrismLocationOption_thenResolves() throws IOException {
+ assertThat(Files.exists(DESTINATION_DIRECTORY)).isFalse();
+ PrismPipelineOptions options = options();
+ options.setPrismLocation(getLocalPrismBuildOrIgnoreTest());
+ PrismLocator underTest = new PrismLocator(options);
+ String got = underTest.resolve();
+ assertThat(got).contains(DESTINATION_DIRECTORY.toString());
+ Path gotPath = Paths.get(got);
+ assertThat(Files.exists(gotPath)).isTrue();
+ }
+
+ @Test
+ public void givenGithubTagPrismLocationOption_thenThrows() {
+ PrismPipelineOptions options = options();
+ options.setPrismLocation(
+
"https://github.com/apache/beam/releases/tag/v2.57.0/apache_beam-v2.57.0-prism-darwin-amd64.zip");
+ PrismLocator underTest = new PrismLocator(options);
+ IllegalArgumentException error =
+ assertThrows(IllegalArgumentException.class, underTest::resolve);
+ assertThat(error.getMessage())
+ .contains(
+ "Provided --prismLocation URL is not an Apache Beam Github Release
page URL or download URL");
+ }
+
+ @Test
+ public void givenPrismLocation404_thenThrows() {
+ PrismPipelineOptions options = options();
+ options.setPrismLocation("https://example.com/i/dont/exist.zip");
+ PrismLocator underTest = new PrismLocator(options);
+ RuntimeException error = assertThrows(RuntimeException.class,
underTest::resolve);
+ assertThat(error.getMessage()).contains("NotFoundException");
+ }
+
+ private static PrismPipelineOptions options() {
+ return PipelineOptionsFactory.create().as(PrismPipelineOptions.class);
+ }
+}