apilloud commented on a change in pull request #13835:
URL: https://github.com/apache/beam/pull/13835#discussion_r566986727
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -217,10 +226,14 @@ public Calc copy(RelTraitSet traitSet, RelNode input,
RexProgram program) {
Types.lookupMethod(DoFn.ProcessContext.class, "output",
Object.class),
output))));
- CalcFn calcFn = new CalcFn(builder.toBlock().toString(), outputSchema);
+ CalcFn calcFn = new CalcFn(builder.toBlock().toString(), outputSchema,
getJarPaths(program));
// validate generated code
- calcFn.compile();
+ try {
+ calcFn.compile();
+ } catch (IOException e) {
Review comment:
nit: possibly handle this inside `compile()` so it doesn't expose an
IOException?
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JavaUdfLoader.java
##########
@@ -111,20 +115,39 @@ private File downloadFile(String inputPath, String
mimeType) throws IOException
}
}
+ private File getLocalJar(String inputJarPath) throws IOException {
+ if (!jarCache.containsKey(inputJarPath)) {
+ jarCache.put(inputJarPath, downloadFile(inputJarPath,
"application/java-archive"));
+ }
+ return jarCache.get(inputJarPath);
+ }
+
private ClassLoader createClassLoader(String inputJarPath) throws
IOException {
- File tmpJar = downloadFile(inputJarPath, "application/java-archive");
+ File tmpJar = getLocalJar(inputJarPath);
return new URLClassLoader(new URL[] {tmpJar.toURI().toURL()});
}
+ public ClassLoader createClassLoader(List<String> inputJarPaths) throws
IOException {
+ List<File> localJars = new ArrayList<>();
+ for (String inputJar : inputJarPaths) {
+ localJars.add(getLocalJar(inputJar));
Review comment:
Couldn't you merge these into a single loop?
`urls.add(getLocalJar(inputJar).toURI().toURL())`
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -274,6 +298,25 @@ public void processElement(ProcessContext c) {
}
}
+ private static List<String> getJarPaths(RexProgram program) {
+ List<String> jarPaths = new ArrayList<>();
Review comment:
Use `ImmutableList.Builder` instead.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
##########
@@ -274,6 +298,25 @@ public void processElement(ProcessContext c) {
}
}
+ private static List<String> getJarPaths(RexProgram program) {
+ List<String> jarPaths = new ArrayList<>();
+ for (RexNode node : program.getExprList()) {
+ if (node instanceof RexCall) {
+ SqlOperator op = ((RexCall) node).op;
Review comment:
A RexNode can only refer to an input or previous RexNode in the list
returned by `getExprList`, so that is handled here.
https://github.com/apache/calcite/blob/3d13846a13398a1ba6c1fa84a7d0c0cc543f23d4/core/src/main/java/org/apache/calcite/rex/RexProgram.java#L80-L85
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]