[
https://issues.apache.org/jira/browse/BEAM-4700?focusedWorklogId=117971&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117971
]
ASF GitHub Bot logged work on BEAM-4700:
----------------------------------------
Author: ASF GitHub Bot
Created on: 01/Jul/18 13:58
Start Date: 01/Jul/18 13:58
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5849: [BEAM-4700]
Convert Beam Row to Avatica Row in BeamEnumerableCollector
URL: https://github.com/apache/beam/pull/5849
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
index 015e8711753..0924542e52c 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamEnumerableConverter.java
@@ -26,6 +26,7 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.sdk.Pipeline;
@@ -42,6 +43,7 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
@@ -71,6 +73,7 @@
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.joda.time.Duration;
+import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -160,14 +163,12 @@ public boolean isReached() {
private static PipelineResult limitRun(
PipelineOptions options,
BeamRelNode node,
- DoFn<Row, KV<String, Row>> collectDoFn,
DoFn<KV<String, Row>, Void> limitCounterDoFn,
LimitStateVar limitStateVar) {
options.as(DirectOptions.class).setBlockOnRun(false);
Pipeline pipeline = Pipeline.create(options);
- BeamSqlRelUtils.toPCollection(pipeline, node)
- .apply(ParDo.of(collectDoFn))
- .apply(ParDo.of(limitCounterDoFn));
+ PCollection<Row> resultCollection =
BeamSqlRelUtils.toPCollection(pipeline, node);
+ resultCollection.apply(ParDo.of(new
LimitCollector())).apply(ParDo.of(limitCounterDoFn));
PipelineResult result = pipeline.run();
@@ -207,7 +208,8 @@ private static PipelineResult limitRun(
Collector.globalValues.put(id, values);
Pipeline pipeline = Pipeline.create(options);
- BeamSqlRelUtils.toPCollection(pipeline, node).apply(ParDo.of(new
Collector()));
+ PCollection<Row> resultCollection =
BeamSqlRelUtils.toPCollection(pipeline, node);
+ resultCollection.apply(ParDo.of(new Collector()));
PipelineResult result = pipeline.run();
result.waitUntilFinish();
@@ -233,7 +235,7 @@ private static PipelineResult limitRun(
LimitCanceller.globalLimitArguments.put(id, limitCount);
LimitCanceller.globalStates.put(id, limitStateVar);
LimitCollector.globalValues.put(id, values);
- limitRun(options, node, new LimitCollector(), new LimitCanceller(),
limitStateVar);
+ limitRun(options, node, new LimitCanceller(), limitStateVar);
LimitCanceller.globalLimitArguments.remove(id);
LimitCanceller.globalStates.remove(id);
LimitCollector.globalValues.remove(id);
@@ -276,6 +278,7 @@ public void processElement(
}
private static class LimitCollector extends DoFn<Row, KV<String, Row>> {
+
// This will only work on the direct runner.
private static final Map<Long, Queue<Object>> globalValues =
new ConcurrentHashMap<Long, Queue<Object>>();
@@ -290,17 +293,18 @@ public void startBundle(StartBundleContext context) {
@ProcessElement
public void processElement(ProcessContext context) {
- Object[] input = context.element().getValues().toArray();
- if (input.length == 1) {
- values.add(input[0]);
+ Object[] avaticaRow = rowToAvatica(context.element());
+ if (avaticaRow.length == 1) {
+ values.add(avaticaRow[0]);
} else {
- values.add(input);
+ values.add(avaticaRow);
}
context.output(KV.of("DummyKey", context.element()));
}
}
private static class Collector extends DoFn<Row, Void> {
+
// This will only work on the direct runner.
private static final Map<Long, Queue<Object>> globalValues =
new ConcurrentHashMap<Long, Queue<Object>>();
@@ -315,15 +319,62 @@ public void startBundle(StartBundleContext context) {
@ProcessElement
public void processElement(ProcessContext context) {
- Object[] input = context.element().getValues().toArray();
- if (input.length == 1) {
- values.add(input[0]);
+ Object[] avaticaRow = rowToAvatica(context.element());
+ if (avaticaRow.length == 1) {
+ values.add(avaticaRow[0]);
} else {
- values.add(input);
+ values.add(avaticaRow);
}
}
}
+ private static Object[] rowToAvatica(Row row) {
+ Schema schema = row.getSchema();
+ Object[] convertedColumns = new Object[schema.getFields().size()];
+ int i = 0;
+ for (Schema.Field field : schema.getFields()) {
+ convertedColumns[i] = fieldToAvatica(field.getType(), row.getValue(i));
+ ++i;
+ }
+ return convertedColumns;
+ }
+
+ private static Object fieldToAvatica(Schema.FieldType type, Object
beamValue) {
+ switch (type.getTypeName()) {
+ case DATETIME:
+ return ((ReadableInstant) beamValue).getMillis();
+ case BYTE:
+ case INT16:
+ case INT32:
+ case INT64:
+ case DECIMAL:
+ case FLOAT:
+ case DOUBLE:
+ case STRING:
+ case BOOLEAN:
+ return beamValue;
+ case ARRAY:
+ return ((List<?>) beamValue)
+ .stream()
+ .map(elem -> fieldToAvatica(type.getCollectionElementType(), elem))
+ .collect(Collectors.toList());
+ case MAP:
+ return ((Map<?, ?>) beamValue)
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ entry -> entry.getKey(),
+ entry -> fieldToAvatica(type.getCollectionElementType(),
entry.getValue())));
+ case ROW:
+ // TODO: needs to be a Struct
+ return beamValue;
+ default:
+ throw new IllegalStateException(
+ String.format("Unreachable case for Beam typename %s",
type.getTypeName()));
+ }
+ }
+
private static Enumerable<Object> count(PipelineOptions options, BeamRelNode
node) {
Pipeline pipeline = Pipeline.create(options);
BeamSqlRelUtils.toPCollection(pipeline, node).apply(ParDo.of(new
RowCounter()));
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 117971)
Time Spent: 50m (was: 40m)
> JDBC driver cannot support TIMESTAMP data type
> ----------------------------------------------
>
> Key: BEAM-4700
> URL: https://issues.apache.org/jira/browse/BEAM-4700
> Project: Beam
> Issue Type: Bug
> Components: dsl-sql
> Reporter: Kenneth Knowles
> Assignee: Kenneth Knowles
> Priority: Blocker
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Avatica allows column representation to be customized, so a timestamp can be
> stored as a variety of types. Joda ReadableInstant is none of these types:
> https://github.com/apache/calcite-avatica/blob/acb675de97b9b0743c09368820a770e2ceda05f8/core/src/main/java/org/apache/calcite/avatica/util/AbstractCursor.java#L162
> By default, it seems to be configured to store {{TIMESTAMP}} columns as
> {{long}} values. If you run the SQL shell and select a {{TIMESTAMP}} column,
> you get:
> {code}
> ava.lang.ClassCastException: org.joda.time.Instant cannot be cast to
> java.lang.Number
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.avatica.util.AbstractCursor$NumberAccessor.getNumber(AbstractCursor.java:726)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.avatica.util.AbstractCursor$TimestampFromNumberAccessor.getString(AbstractCursor.java:1026)
> at
> org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.avatica.AvaticaResultSet.getString(AvaticaResultSet.java:225)
> at sqlline.Rows$Row.<init>(Rows.java:183)
> {code}
> So, essentially, Beam SQL Shell does not support timestamps.
> We may be able to:
> - override how the accessor for our existing storage is created
> - configure what the column representation is (this doesn't really help,
> since none of the choices are ours)
> - convert timestamps to longs in BeamEnumerableConverter; not sure how many
> conversions will be required here
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)