[ 
https://issues.apache.org/jira/browse/BEAM-1754?focusedWorklogId=762408&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762408
 ]

ASF GitHub Bot logged work on BEAM-1754:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Apr/22 16:01
            Start Date: 26/Apr/22 16:01
    Worklog Time Spent: 10m 
      Work Description: damccorm commented on code in PR #17341:
URL: https://github.com/apache/beam/pull/17341#discussion_r858885358


##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(

Review Comment:
   ```suggestion
       this.process = childProcess.spawnSync(
   ```
   
   Nit: Not really _necessary_, but I expect this will give us a better 
exception if there are issues with spawning the process itself



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();
+      throw new Error(
+        "Timed out waiting for service after " + timeoutMs + "ms."
+      );
+    }
+  }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+  gradleTarget: string,
+  args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+  return async () => {
+    return new JavaJarService(
+      await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+      args
+    );
+  };
+}
+
+export class JavaJarService extends SubprocessService {
+  static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";;
+  static BEAM_GROUP_ID = "org.apache.beam";
+  static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+  constructor(jar: string, args: string[] | undefined = undefined) {
+    if (args == undefined) {
+      // TODO: (Extension) Should filesToStage be set at some higher level?
+      args = ["{{PORT}}", "--filesToStage=" + jar];
+    }
+    super("java", ["-jar", jar].concat(args));
+  }
+
+  static async cachedJar(
+    urlOrPath: string,
+    cacheDir: string = JavaJarService.JAR_CACHE
+  ): Promise<string> {
+    if (urlOrPath.match(/^https?:\/\//)) {
+      fs.mkdirSync(cacheDir, { recursive: true });
+      const dest = path.join(
+        JavaJarService.JAR_CACHE,
+        path.basename(urlOrPath)
+      );
+      if (fs.existsSync(dest)) {
+        return dest;
+      }
+      // TODO: (Cleanup) Use true temporary file.
+      const tmp = dest + ".tmp" + Math.random();
+      return new Promise((resolve, reject) => {
+        const fout = fs.createWriteStream(tmp);
+        console.log("Downloading", urlOrPath);
+        const request = https.get(urlOrPath, function (response) {
+          if (response.statusCode !== 200) {
+            reject(
+              `Error code ${response.statusCode} when downloading ${urlOrPath}`
+            );
+          }
+          response.pipe(fout);
+          fout.on("finish", function () {
+            fout.close(() => {
+              fs.renameSync(tmp, dest);
+              resolve(dest);
+            });
+          });
+        });
+      });
+    } else {
+      return urlOrPath;
+    }
+  }
+
+  static gradleToJar(
+    gradleTarget: string,
+    appendix: string | undefined = undefined,
+    version: string = beamVersion
+  ): string {
+    if (version.startsWith("0.")) {
+      // node-ts 0.x corresponds to Beam 2.x.
+      version = "2" + version.substring(1);
+    }
+    version = "2.36.0";

Review Comment:
   I don't think we want this line here anymore



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();
+      throw new Error(
+        "Timed out waiting for service after " + timeoutMs + "ms."
+      );
+    }
+  }
+}
+
+export function serviceProviderFromJavaGradleTarget(
+  gradleTarget: string,
+  args: string[] | undefined = undefined
+): () => Promise<JavaJarService> {
+  return async () => {
+    return new JavaJarService(
+      await JavaJarService.cachedJar(JavaJarService.gradleToJar(gradleTarget)),
+      args
+    );
+  };
+}
+
+export class JavaJarService extends SubprocessService {
+  static APACHE_REPOSITORY = "https://repo.maven.apache.org/maven2";;
+  static BEAM_GROUP_ID = "org.apache.beam";
+  static JAR_CACHE = path.join(os.homedir(), ".apache_beam", "cache", "jars");
+
+  constructor(jar: string, args: string[] | undefined = undefined) {
+    if (args == undefined) {
+      // TODO: (Extension) Should filesToStage be set at some higher level?
+      args = ["{{PORT}}", "--filesToStage=" + jar];
+    }
+    super("java", ["-jar", jar].concat(args));
+  }
+
+  static async cachedJar(
+    urlOrPath: string,
+    cacheDir: string = JavaJarService.JAR_CACHE
+  ): Promise<string> {
+    if (urlOrPath.match(/^https?:\/\//)) {
+      fs.mkdirSync(cacheDir, { recursive: true });
+      const dest = path.join(
+        JavaJarService.JAR_CACHE,
+        path.basename(urlOrPath)
+      );
+      if (fs.existsSync(dest)) {
+        return dest;
+      }
+      // TODO: (Cleanup) Use true temporary file.
+      const tmp = dest + ".tmp" + Math.random();
+      return new Promise((resolve, reject) => {
+        const fout = fs.createWriteStream(tmp);
+        console.log("Downloading", urlOrPath);
+        const request = https.get(urlOrPath, function (response) {
+          if (response.statusCode !== 200) {
+            reject(
+              `Error code ${response.statusCode} when downloading ${urlOrPath}`
+            );
+          }
+          response.pipe(fout);
+          fout.on("finish", function () {
+            fout.close(() => {
+              fs.renameSync(tmp, dest);
+              resolve(dest);
+            });
+          });
+        });
+      });
+    } else {
+      return urlOrPath;
+    }
+  }
+
+  static gradleToJar(
+    gradleTarget: string,
+    appendix: string | undefined = undefined,
+    version: string = beamVersion
+  ): string {
+    if (version.startsWith("0.")) {
+      // node-ts 0.x corresponds to Beam 2.x.
+      version = "2" + version.substring(1);
+    }
+    version = "2.36.0";
+    const gradlePackage = gradleTarget.match(/^:?(.*):[^:]+:?$/)![1];
+    const artifactId = "beam-" + gradlePackage.replaceAll(":", "-");
+    const projectRoot = path.resolve(
+      __dirname,
+      "..",
+      "..",
+      "..",
+      "..",
+      "..",
+      ".."
+    );

Review Comment:
   This is pretty fragile and likely to break if we restructure our directories 
or even if we restructure where the JavaScript is getting built to. Could we 
refactor it out into a shared constants class? It would be even better if we 
put some test in place to guarantee that it correctly points to the project 
root. (e.g. its able to find the `.github` directory, or something else that 
can't move). Then if it breaks, it will at least be obvious



##########
sdks/typescript/src/apache_beam/utils/service.ts:
##########
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+const fs = require("fs");
+const https = require("https");
+const os = require("os");
+const net = require("net");
+const path = require("path");
+const childProcess = require("child_process");
+
+// TODO: (Typescript) Why can't the var above be used as a namespace?
+import { ChildProcess } from "child_process";
+
+import { version as beamVersion } from "../version";
+
+export interface Service {
+  start: () => Promise<string>;
+  stop: () => Promise<void>;
+}
+
+export class ExternalService implements Service {
+  constructor(public address: string) {
+    this.address = address;
+  }
+  async start() {
+    return this.address;
+  }
+  async stop() {}
+}
+
+export class SubprocessService {
+  process: ChildProcess;
+  cmd: string;
+  args: string[];
+
+  constructor(cmd: string, args: string[]) {
+    this.cmd = cmd;
+    this.args = args;
+  }
+
+  async start() {
+    // TODO: (Cleanup) Choose a free port.
+    const host = "localhost";
+    const port = "7778";
+    console.log(this.args.map((arg) => arg.replace("{{PORT}}", port)));
+    this.process = childProcess.spawn(
+      this.cmd,
+      this.args.map((arg) => arg.replace("{{PORT}}", port)),
+      {
+        stdio: "inherit",
+      }
+    );
+
+    try {
+      await this.portReady(port, host, 10000);
+    } catch (error) {
+      this.process.kill();
+      throw error;
+    }
+
+    return host + ":" + port;
+  }
+
+  async stop() {
+    this.process.kill();
+  }
+
+  async portReady(port, host, timeoutMs, iterMs = 100) {
+    const start = Date.now();
+    let connected = false;
+    while (!connected && Date.now() - start < timeoutMs) {
+      if (this.process.exitCode) {
+        throw new Error("Aborted with error " + this.process.exitCode);
+      }
+      await new Promise((r) => setTimeout(r, iterMs));
+      try {
+        await new Promise<void>((resolve, reject) => {
+          const socket = net.createConnection(port, host, () => {
+            resolve();
+            socket.end();
+            connected = true;
+          });
+          socket.on("error", (err) => {
+            reject(err);
+          });
+        });
+      } catch (err) {
+        // go around again
+      }
+    }
+    if (!connected) {
+      this.process.kill();

Review Comment:
   I think it makes more sense to let the caller do this (and the caller 
already is). Regardless, we probably don't need it in both places





Issue Time Tracking
-------------------

    Worklog Id:     (was: 762408)
    Time Spent: 4h  (was: 3h 50m)

> Will Dataflow ever support Node.js with an SDK similar to Java or Python?
> -------------------------------------------------------------------------
>
>                 Key: BEAM-1754
>                 URL: https://issues.apache.org/jira/browse/BEAM-1754
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-ideas
>            Reporter: Diego Zuluaga
>            Assignee: Kerry Donny-Clark
>            Priority: P3
>              Labels: node.js
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> I like the philosophy behind DataFlow and found the Java and Python samples 
> highly comprehensible. However, I have to admit that for most Node.js 
> developers who have little background on typed languages and are used to get 
> up to speed with frameworks incredibly fast, learning Dataflow might take 
> some learning curve that they/we're not used to. So, I wonder if at any point 
> in time Dataflow will provide a Node.js SDK. Maybe this is out of the 
> question, but I wanted to run it by the team as it would be awesome to have 
> something along these lines!
> Thanks,
> Diego
> Question originaly posted in SO:
> http://stackoverflow.com/questions/42893436/will-dataflow-ever-support-node-js-with-and-sdk-similar-to-java-or-python



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to