[
https://issues.apache.org/jira/browse/BEAM-12473?focusedWorklogId=610137&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-610137
]
ASF GitHub Bot logged work on BEAM-12473:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Jun/21 07:20
Start Date: 14/Jun/21 07:20
Worklog Time Spent: 10m
Work Description: ibzib commented on a change in pull request #14986:
URL: https://github.com/apache/beam/pull/14986#discussion_r650217041
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+ public static class CombineFnDelegate<InputT, AccumT, OutputT>
Review comment:
Add a javadoc comment.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -70,7 +72,16 @@ public String getName() {
@Override
public RelDataType getType(RelDataTypeFactory typeFactory) {
- return CalciteUtils.sqlTypeWithAutoCast(typeFactory,
getInputType());
+ Type inputType = getInputType();
+ if (inputType instanceof TypeVariable) {
+ throw new IllegalArgumentException(
+ "Unable to infer SQL type from type variable "
+ + inputType
+ + ". This usually means you are trying to use a generic
type whose type information "
+ + "is not known at runtime. You can wrap your CombineFn
into typed subclass"
+ + " by 'new UdfTypeUtils.CombineFnDelegate<>(combineFn)
{}'");
Review comment:
Nit: if the user literally types `new
UdfTypeUtils.CombineFnDelegate<>(combineFn) {}`, it will fail to compile:
`Cannot use '<>' with anonymous inner classes`. Can we put in placeholders for
the implementation types?
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -70,7 +72,16 @@ public String getName() {
@Override
public RelDataType getType(RelDataTypeFactory typeFactory) {
- return CalciteUtils.sqlTypeWithAutoCast(typeFactory,
getInputType());
+ Type inputType = getInputType();
+ if (inputType instanceof TypeVariable) {
Review comment:
I wonder if (here and elsewhere), instead of checking `inputType
instanceof TypeVariable`, we should check `!(inputType instanceof
ParameterizedType)`. Not sure if we are able to handle GenericArrayType or
WildcardType.
https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/reflect/Type.html
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
Review comment:
Why do we need this outer class?
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+ public static class CombineFnDelegate<InputT, AccumT, OutputT>
+ extends Combine.CombineFn<InputT, AccumT, OutputT> {
+
+ private final Combine.CombineFn<InputT, AccumT, OutputT> delegate;
+
+ protected CombineFnDelegate(Combine.CombineFn<InputT, AccumT, OutputT>
delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public AccumT createAccumulator() {
+ return delegate.createAccumulator();
+ }
+
+ @Override
+ public AccumT addInput(AccumT mutableAccumulator, InputT input) {
+ return delegate.addInput(mutableAccumulator, input);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return delegate.mergeAccumulators(accumulators);
+ }
+
+ @Override
+ public OutputT extractOutput(AccumT accumulator) {
+ return delegate.extractOutput(accumulator);
+ }
+
+ @Override
+ public AccumT compact(AccumT accumulator) {
+ return delegate.compact(accumulator);
+ }
+
+ @Override
+ public OutputT apply(Iterable<? extends InputT> inputs) {
+ return delegate.apply(inputs);
+ }
+
+ @Override
+ public OutputT defaultValue() {
+ return delegate.defaultValue();
+ }
+
+ @Override
+ public TypeDescriptor<OutputT> getOutputType() {
+ return
Optional.<TypeDescriptor<OutputT>>ofNullable(getGenericSuperTypeAtIndex(2))
+ .orElse(delegate.getOutputType());
+ }
+
+ @Override
+ public TypeDescriptor<InputT> getInputType() {
+ return
Optional.<TypeDescriptor<InputT>>ofNullable(getGenericSuperTypeAtIndex(0))
+ .orElse(delegate.getInputType());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Nullable
+ private <T> TypeDescriptor<T> getGenericSuperTypeAtIndex(int index) {
+ Type superClass = getClass().getGenericSuperclass();
+ if (superClass != null) {
+ ParameterizedType superType = (ParameterizedType) superClass;
Review comment:
What if superClass isn't an instance of ParameterizedType? IIRC that
would be possible if using multiple layers of inheritance. (Not sure why anyone
would do that, but probably something we should handle regardless.)
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfTypeUtils.java
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.Optional;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+public class UdfTypeUtils {
+
+ public static class CombineFnDelegate<InputT, AccumT, OutputT>
+ extends Combine.CombineFn<InputT, AccumT, OutputT> {
+
+ private final Combine.CombineFn<InputT, AccumT, OutputT> delegate;
+
+ protected CombineFnDelegate(Combine.CombineFn<InputT, AccumT, OutputT>
delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public AccumT createAccumulator() {
+ return delegate.createAccumulator();
+ }
+
+ @Override
+ public AccumT addInput(AccumT mutableAccumulator, InputT input) {
+ return delegate.addInput(mutableAccumulator, input);
+ }
+
+ @Override
+ public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+ return delegate.mergeAccumulators(accumulators);
+ }
+
+ @Override
+ public OutputT extractOutput(AccumT accumulator) {
+ return delegate.extractOutput(accumulator);
+ }
+
+ @Override
+ public AccumT compact(AccumT accumulator) {
+ return delegate.compact(accumulator);
+ }
+
+ @Override
+ public OutputT apply(Iterable<? extends InputT> inputs) {
+ return delegate.apply(inputs);
+ }
+
+ @Override
+ public OutputT defaultValue() {
+ return delegate.defaultValue();
+ }
+
+ @Override
+ public TypeDescriptor<OutputT> getOutputType() {
Review comment:
Nit: we could move getOutputType and getInputType to the top of the
class to highlight they're the only methods that don't just defer to the
delegate.
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdafImpl.java
##########
@@ -102,17 +117,23 @@ protected Type getOutputType() {
return combineFn.getOutputType().getType();
}
+ @Nullable
+ private Type getDeclaredInputType() {
Review comment:
Why do we need a separate method?
##########
File path:
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFnTest.java
##########
@@ -63,6 +69,20 @@ public void mergeAccumulators() {
assertEquals(2L, merged);
}
+ @Test
+ public void testParameterExtractionFromCombineFn_CombineFnDelegate() {
Review comment:
This test is in class `LazyAggregateCombineFnTest`, but it doesn't use
`LazyAggregateCombineFn` at all. Since `UdafImplTest.java` doesn't exist,
let's create a new file for this test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 610137)
Time Spent: 40m (was: 0.5h)
> ClassCastException when using registerUdaf in Calcite SQL
> ---------------------------------------------------------
>
> Key: BEAM-12473
> URL: https://issues.apache.org/jira/browse/BEAM-12473
> Project: Beam
> Issue Type: Bug
> Components: dsl-sql
> Affects Versions: 2.31.0
> Reporter: Jan Lukavský
> Assignee: Jan Lukavský
> Priority: P2
> Time Spent: 40m
> Remaining Estimate: 0h
>
> When using {{registerUdaf}} as follows:
> {code:java}
> .registerUdaf(
> "LONGEST_WORD",
> Max.of(
> (Comparator<String> & Serializable)
> (String a, String b) ->
> Integer.compare(a.length(), b.length()))))
> {code}
> results in following exception:
> {noformat}
> Caused by: java.lang.ClassCastException: class
> sun.reflect.generics.reflectiveObjects.TypeVariableImpl cannot be cast to
> class java.lang.Class
> (sun.reflect.generics.reflectiveObjects.TypeVariableImpl and java.lang.Class
> are in module java.base of loader 'bootstrap')
> at
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.sqlTypeWithAutoCast(CalciteUtils.java:327)
> at
> org.apache.beam.sdk.extensions.sql.impl.UdafImpl$1.getType(UdafImpl.java:73)
> at
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.CalciteCatalogReader.toOp(CalciteCatalogReader.java:315)
> at
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.CalciteCatalogReader.toOp(CalciteCatalogReader.java:302)
> at
> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.CalciteCatalogReader.lambda$lookupOperatorOverloads$3(CalciteCatalogReader.java:271)
> at
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at
> java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
> {noformat}
> If this way of registering UDAF is not supported, it should throw a more
> user-friendly exception.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)