This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch confluent in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 55ec2c76855782ceb012bcb72ede8a5155216421 Author: pawelkocinski <[email protected]> AuthorDate: Sun Mar 30 22:01:28 2025 +0200 SEDONA-721 Apply requested changes. --- .../sedona/flink/confluent/Constructors.java | 44 ++++++++ .../apache/sedona/flink/confluent/Functions.java | 33 ++++++ .../sedona/flink/confluent/GeometrySerde.java | 47 +++++++++ .../constructors/ST_GeomCollFromText.java | 39 +++++++ .../confluent/constructors/ST_GeomFromEWKT.java | 32 ++++++ .../confluent/constructors/ST_GeomFromGML.java | 39 +++++++ .../confluent/constructors/ST_GeomFromGeoHash.java | 41 ++++++++ .../confluent/constructors/ST_GeomFromGeoJSON.java | 35 +++++++ .../confluent/constructors/ST_GeomFromKML.java | 33 ++++++ .../confluent/constructors/ST_GeomFromText.java | 39 +++++++ .../confluent/constructors/ST_GeomFromWKB.java | 40 ++++++++ .../confluent/constructors/ST_GeomFromWKT.java | 39 +++++++ .../constructors/ST_GeometryFromText.java | 39 +++++++ .../confluent/constructors/ST_LineFromText.java | 44 ++++++++ .../confluent/constructors/ST_LineFromWKB.java | 61 +++++++++++ .../constructors/ST_LineStringFromText.java | 42 ++++++++ .../confluent/constructors/ST_MLineFromText.java | 38 +++++++ .../confluent/constructors/ST_MPointFromText.java | 38 +++++++ .../confluent/constructors/ST_MPolyFromText.java | 38 +++++++ .../confluent/constructors/ST_MakeEnvelope.java | 47 +++++++++ .../confluent/constructors/ST_MakePointM.java | 34 ++++++ .../flink/confluent/constructors/ST_Point.java | 31 ++++++ .../constructors/ST_PointFromGeoHash.java | 38 +++++++ .../confluent/constructors/ST_PointFromText.java | 42 ++++++++ .../confluent/constructors/ST_PointFromWKB.java | 61 +++++++++++ .../flink/confluent/constructors/ST_PointM.java | 43 ++++++++ .../flink/confluent/constructors/ST_PointZ.java | 43 ++++++++ .../flink/confluent/constructors/ST_PointZM.java | 45 ++++++++ .../constructors/ST_PolygonFromEnvelope.java | 44 ++++++++ .../confluent/constructors/ST_PolygonFromText.java | 44 ++++++++ pyflink/.python-version | 1 + pyflink/README.md | 0 pyflink/cli.py | 23 +++++ pyflink/pyproject.toml | 9 ++ pyflink/sedonuts/cli/__init__.py | 0 pyflink/sedonuts/cli/confluent/__init__.py | 0 pyflink/sedonuts/cli/confluent/functions.py | 27 +++++ pyflink/sedonuts/cli/confluent/generate_ddl.py | 43 ++++++++ .../sedonuts/cli/confluent/generate_terraform.py | 20 ++++ pyflink/sedonuts/cli/confluent/insert_with_cli.py | 114 +++++++++++++++++++++ pyflink/sedonuts/cli/confluent/template.py | 3 + 41 files changed, 1473 insertions(+) diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/confluent/Constructors.java new file mode 100644 index 0000000000..57aada0c32 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/Constructors.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent; + +import org.apache.sedona.common.enums.FileDataSplitter; +import org.apache.sedona.common.enums.GeometryType; +import org.apache.sedona.common.utils.FormatUtils; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; + +public class Constructors { + + public static Geometry getGeometryByType( + String geom, String inputDelimiter, GeometryType geometryType) throws ParseException { + FileDataSplitter delimiter = + inputDelimiter == null + ? FileDataSplitter.CSV + : FileDataSplitter.getFileDataSplitter(inputDelimiter); + FormatUtils<Geometry> formatUtils = new FormatUtils<>(delimiter, false, geometryType); + return formatUtils.readGeometry(geom); + } + + public static Geometry getGeometryByFileData(String wktString, FileDataSplitter dataSplitter) + throws ParseException { + FormatUtils<Geometry> formatUtils = new FormatUtils<>(dataSplitter, false); + return formatUtils.readGeometry(wktString); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/Functions.java b/flink/src/main/java/org/apache/sedona/flink/confluent/Functions.java new file mode 100644 index 0000000000..70ecdc2783 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/Functions.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.locationtech.jts.geom.Geometry; + +public class Functions { + static class ST_Area extends ScalarFunction { + @DataTypeHint("Double") + public Double eval(byte[] o) { + Geometry geom = GeometrySerde.deserialize(o); + return org.apache.sedona.common.Functions.area(geom); + } + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/GeometrySerde.java b/flink/src/main/java/org/apache/sedona/flink/confluent/GeometrySerde.java new file mode 100644 index 0000000000..87b1e61537 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/GeometrySerde.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent; + +import java.util.Arrays; +import org.apache.sedona.common.Constructors; +import org.apache.sedona.common.Functions; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.io.ParseException; + +public class GeometrySerde { + + public static GeometryFactory GEOMETRY_FACTORY = new GeometryFactory(); + + public static byte[] serialize(Geometry geom) { + return Functions.asEWKB(geom); + } + + public static Geometry deserialize(byte[] bytes) { + try { + return Constructors.geomFromWKB(bytes); + } catch (ParseException e) { + String msg = + String.format( + "Failed to parse WKB(printed through Arrays.toString(bytes)): %s, error: %s", + Arrays.toString(bytes), e.getMessage()); + throw new IllegalArgumentException(msg); + } + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomCollFromText.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomCollFromText.java new file mode 100644 index 0000000000..413f01a899 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomCollFromText.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_GeomCollFromText extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint(value = "String") String wkt, @DataTypeHint("Int") Integer srid) + throws ParseException { + return GeometrySerde.serialize( + org.apache.sedona.common.Constructors.geomCollFromText(wkt, srid)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint(value = "String") String wkt) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomCollFromText(wkt, 0)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromEWKT.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromEWKT.java new file mode 100644 index 0000000000..9f481c1473 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromEWKT.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_GeomFromEWKT extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wktString) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomFromEWKT(wktString)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGML.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGML.java new file mode 100644 index 0000000000..e4597457d9 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGML.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.gml2.GMLReader; + +public class ST_GeomFromGML extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String gml) throws ParseException { + GMLReader reader = new GMLReader(); + try { + return GeometrySerde.serialize(reader.read(gml, new GeometryFactory())); + } catch (Exception e) { + throw new ParseException(e); + } + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoHash.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoHash.java new file mode 100644 index 0000000000..ff25a6e96c --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoHash.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.common.utils.GeoHashDecoder; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_GeomFromGeoHash extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String value, @DataTypeHint("Int") Integer precision) + throws ParseException, GeoHashDecoder.InvalidGeoHashException { + // The default precision is the geohash length. Otherwise, use the precision given by the user + return GeometrySerde.serialize(GeoHashDecoder.decode(value, precision)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String value) + throws ParseException, GeoHashDecoder.InvalidGeoHashException { + return eval(value, null); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoJSON.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoJSON.java new file mode 100644 index 0000000000..9c52ab6d88 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromGeoJSON.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import static org.apache.sedona.flink.confluent.Constructors.getGeometryByFileData; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.common.enums.FileDataSplitter; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_GeomFromGeoJSON extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String geoJson) throws ParseException { + return GeometrySerde.serialize(getGeometryByFileData(geoJson, FileDataSplitter.GEOJSON)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromKML.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromKML.java new file mode 100644 index 0000000000..25286f4664 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromKML.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; +import org.locationtech.jts.io.kml.KMLReader; + +public class ST_GeomFromKML extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String kml) throws ParseException { + return GeometrySerde.serialize(new KMLReader().read(kml)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromText.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromText.java new file mode 100644 index 0000000000..9f81c5f595 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromText.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_GeomFromText extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wktString) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomFromWKT(wktString, 0)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wktString, @DataTypeHint("Int") Integer srid) + throws ParseException { + return GeometrySerde.serialize( + org.apache.sedona.common.Constructors.geomFromWKT(wktString, srid)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKB.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKB.java new file mode 100644 index 0000000000..d2dcefbb6f --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKB.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import static org.apache.sedona.flink.confluent.Constructors.getGeometryByFileData; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.common.enums.FileDataSplitter; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_GeomFromWKB extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wkbString) throws ParseException { + return GeometrySerde.serialize(getGeometryByFileData(wkbString, FileDataSplitter.WKB)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("Bytes") byte[] wkb) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomFromWKB(wkb)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKT.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKT.java new file mode 100644 index 0000000000..4144c1778d --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeomFromWKT.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_GeomFromWKT extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wktString) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomFromWKT(wktString, 0)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wktString, @DataTypeHint("Int") Integer srid) + throws ParseException { + return GeometrySerde.serialize( + org.apache.sedona.common.Constructors.geomFromWKT(wktString, srid)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeometryFromText.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeometryFromText.java new file mode 100644 index 0000000000..9a184e21ea --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_GeometryFromText.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_GeometryFromText extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wktString) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.geomFromWKT(wktString, 0)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wktString, @DataTypeHint("Int") Integer srid) + throws ParseException { + return GeometrySerde.serialize( + org.apache.sedona.common.Constructors.geomFromWKT(wktString, srid)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromText.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromText.java new file mode 100644 index 0000000000..e1205aa136 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromText.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import static org.apache.sedona.flink.confluent.Constructors.getGeometryByType; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.common.enums.GeometryType; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_LineFromText extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("String") String lineString, @DataTypeHint("String") String inputDelimiter) + throws ParseException { + // The default delimiter is comma. Otherwise, use the delimiter given by the user + return GeometrySerde.serialize( + getGeometryByType(lineString, inputDelimiter, GeometryType.LINESTRING)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String lineString) throws ParseException { + return eval(lineString, null); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromWKB.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromWKB.java new file mode 100644 index 0000000000..792adff004 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineFromWKB.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import static org.apache.sedona.flink.confluent.Constructors.getGeometryByFileData; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.common.enums.FileDataSplitter; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.LineString; +import org.locationtech.jts.io.ParseException; + +public class ST_LineFromWKB extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wkbString) throws ParseException { + Geometry geometry = getGeometryByFileData(wkbString, FileDataSplitter.WKB); + if (geometry instanceof LineString) { + return GeometrySerde.serialize(geometry); + } + return null; + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wkbString, int srid) throws ParseException { + Geometry geometry = getGeometryByFileData(wkbString, FileDataSplitter.WKB); + if (geometry instanceof LineString) { + geometry = org.apache.sedona.common.Functions.setSRID(geometry, srid); + return GeometrySerde.serialize(geometry); + } + return null; // Return null if geometry is not a Linestring + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("Bytes") byte[] wkb) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.lineFromWKB(wkb)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("Bytes") byte[] wkb, int srid) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.lineFromWKB(wkb, srid)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineStringFromText.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineStringFromText.java new file mode 100644 index 0000000000..8a5ebc6d61 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_LineStringFromText.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_LineStringFromText extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("String") String lineString, @DataTypeHint("String") String inputDelimiter) + throws ParseException { + // The default delimiter is comma. Otherwise, use the delimiter given by the user + return GeometrySerde.serialize( + new org.apache.sedona.flink.expressions.Constructors.ST_LineFromText() + .eval(lineString, inputDelimiter)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String lineString) throws ParseException { + return eval(lineString, null); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MLineFromText.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MLineFromText.java new file mode 100644 index 0000000000..449f110136 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MLineFromText.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_MLineFromText extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint(value = "String") String wkt, @DataTypeHint("Int") Integer srid) + throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.mLineFromText(wkt, srid)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint(value = "String") String wkt) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.mLineFromText(wkt, 0)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPointFromText.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPointFromText.java new file mode 100644 index 0000000000..968325182b --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPointFromText.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_MPointFromText extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint(value = "String") String wkt, @DataTypeHint("Int") Integer srid) + throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.mPointFromText(wkt, srid)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint(value = "String") String wkt) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.mPointFromText(wkt, 0)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPolyFromText.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPolyFromText.java new file mode 100644 index 0000000000..bdbe27805f --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MPolyFromText.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_MPolyFromText extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint(value = "String") String wkt, @DataTypeHint("Int") Integer srid) + throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.mPolyFromText(wkt, srid)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint(value = "String") String wkt) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.mPolyFromText(wkt, 0)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakeEnvelope.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakeEnvelope.java new file mode 100644 index 0000000000..2f276f55e1 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakeEnvelope.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; + +public class ST_MakeEnvelope extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("Double") Double minX, + @DataTypeHint("Double") Double minY, + @DataTypeHint("Double") Double maxX, + @DataTypeHint("Double") Double maxY, + @DataTypeHint("Integer") Integer srid) { + return GeometrySerde.serialize( + org.apache.sedona.common.Constructors.makeEnvelope(minX, minY, maxX, maxY, srid)); + } + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("Double") Double minX, + @DataTypeHint("Double") Double minY, + @DataTypeHint("Double") Double maxX, + @DataTypeHint("Double") Double maxY) { + return GeometrySerde.serialize( + org.apache.sedona.common.Constructors.makeEnvelope(minX, minY, maxX, maxY)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakePointM.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakePointM.java new file mode 100644 index 0000000000..53f4874232 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_MakePointM.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; + +public class ST_MakePointM extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("Double") Double x, + @DataTypeHint("Double") Double y, + @DataTypeHint("Double") Double m) { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.makePointM(x, y, m)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_Point.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_Point.java new file mode 100644 index 0000000000..2a71472ef1 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_Point.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; + +public class ST_Point extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("Double") Double x, @DataTypeHint("Double") Double y) { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.point(x, y)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromGeoHash.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromGeoHash.java new file mode 100644 index 0000000000..985aca4f82 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromGeoHash.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; + +public class ST_PointFromGeoHash extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String value, @DataTypeHint("Int") Integer precision) { + // The default precision is the geohash length. Otherwise, use the precision given by the user + return GeometrySerde.serialize( + org.apache.sedona.common.Constructors.pointFromGeoHash(value, precision)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String value) { + return eval(value, null); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromText.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromText.java new file mode 100644 index 0000000000..a2e624bf69 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromText.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import static org.apache.sedona.flink.confluent.Constructors.getGeometryByType; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.common.enums.GeometryType; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_PointFromText extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) + throws ParseException { + return GeometrySerde.serialize(getGeometryByType(s, inputDelimiter, GeometryType.POINT)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String s) throws ParseException { + return eval(s, null); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromWKB.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromWKB.java new file mode 100644 index 0000000000..b51ae97c67 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointFromWKB.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import static org.apache.sedona.flink.confluent.Constructors.getGeometryByFileData; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.common.enums.FileDataSplitter; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.Point; +import org.locationtech.jts.io.ParseException; + +public class ST_PointFromWKB extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wkbString) throws ParseException { + Geometry geometry = getGeometryByFileData(wkbString, FileDataSplitter.WKB); + if (geometry instanceof Point) { + return GeometrySerde.serialize(geometry); + } + return null; // Return null if geometry is not a Point + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String wkbString, int srid) throws ParseException { + Geometry geometry = getGeometryByFileData(wkbString, FileDataSplitter.WKB); + if (geometry instanceof Point) { + geometry = org.apache.sedona.common.Functions.setSRID(geometry, srid); + return GeometrySerde.serialize(geometry); + } + return null; // Return null if geometry is not a Point + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("Bytes") byte[] wkb) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.pointFromWKB(wkb)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("Bytes") byte[] wkb, int srid) throws ParseException { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.pointFromWKB(wkb, srid)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointM.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointM.java new file mode 100644 index 0000000000..70c42066eb --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointM.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; + +public class ST_PointM extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("Double") Double x, + @DataTypeHint("Double") Double y, + @DataTypeHint("Double") Double m) { + return eval(x, y, m, 0); + } + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("Double") Double x, + @DataTypeHint("Double") Double y, + @DataTypeHint("Double") Double m, + @DataTypeHint("Integer") Integer srid) { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.pointM(x, y, m, srid)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZ.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZ.java new file mode 100644 index 0000000000..17dfffd392 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZ.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; + +public class ST_PointZ extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("Double") Double x, + @DataTypeHint("Double") Double y, + @DataTypeHint("Double") Double z) { + return eval(x, y, z, 0); + } + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("Double") Double x, + @DataTypeHint("Double") Double y, + @DataTypeHint("Double") Double z, + @DataTypeHint("Integer") Integer srid) { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.pointZ(x, y, z, srid)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZM.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZM.java new file mode 100644 index 0000000000..1836b60bf2 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PointZM.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; + +public class ST_PointZM extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("Double") Double x, + @DataTypeHint("Double") Double y, + @DataTypeHint("Double") Double z, + @DataTypeHint("Double") Double m) { + return eval(x, y, z, m, 0); + } + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("Double") Double x, + @DataTypeHint("Double") Double y, + @DataTypeHint("Double") Double z, + @DataTypeHint("Double") Double m, + @DataTypeHint("Integer") Integer srid) { + return GeometrySerde.serialize(org.apache.sedona.common.Constructors.pointZM(x, y, z, m, srid)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromEnvelope.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromEnvelope.java new file mode 100644 index 0000000000..9595264495 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromEnvelope.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.geom.Coordinate; +import org.locationtech.jts.geom.GeometryFactory; + +public class ST_PolygonFromEnvelope extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("Double") Double minX, + @DataTypeHint("Double") Double minY, + @DataTypeHint("Double") Double maxX, + @DataTypeHint("Double") Double maxY) { + Coordinate[] coordinates = new Coordinate[5]; + coordinates[0] = new Coordinate(minX, minY); + coordinates[1] = new Coordinate(minX, maxY); + coordinates[2] = new Coordinate(maxX, maxY); + coordinates[3] = new Coordinate(maxX, minY); + coordinates[4] = coordinates[0]; + GeometryFactory geometryFactory = new GeometryFactory(); + return GeometrySerde.serialize(geometryFactory.createPolygon(coordinates)); + } +} diff --git a/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromText.java b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromText.java new file mode 100644 index 0000000000..53a8409e00 --- /dev/null +++ b/flink/src/main/java/org/apache/sedona/flink/confluent/constructors/ST_PolygonFromText.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.flink.confluent.constructors; + +import static org.apache.sedona.flink.confluent.Constructors.getGeometryByType; + +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.sedona.common.enums.GeometryType; +import org.apache.sedona.flink.confluent.GeometrySerde; +import org.locationtech.jts.io.ParseException; + +public class ST_PolygonFromText extends ScalarFunction { + + @DataTypeHint("Bytes") + public byte[] eval( + @DataTypeHint("String") String polygonString, @DataTypeHint("String") String inputDelimiter) + throws ParseException { + // The default delimiter is comma. Otherwise, use the delimiter given by the user + return GeometrySerde.serialize( + getGeometryByType(polygonString, inputDelimiter, GeometryType.POLYGON)); + } + + @DataTypeHint("Bytes") + public byte[] eval(@DataTypeHint("String") String polygonString) throws ParseException { + return eval(polygonString, null); + } +} diff --git a/pyflink/.python-version b/pyflink/.python-version new file mode 100644 index 0000000000..2c0733315e --- /dev/null +++ b/pyflink/.python-version @@ -0,0 +1 @@ +3.11 diff --git a/pyflink/README.md b/pyflink/README.md new file mode 100644 index 0000000000..e69de29bb2 diff --git a/pyflink/cli.py b/pyflink/cli.py new file mode 100644 index 0000000000..2b0b5e09cf --- /dev/null +++ b/pyflink/cli.py @@ -0,0 +1,23 @@ +import typer + +from sedonuts.cli.confluent.generate_ddl import create_ddl_command +from sedonuts.cli.confluent.generate_terraform import create_terraform +from sedonuts.cli.confluent.insert_with_cli import create_confluent_cli_command + + +def main(): + terraform_command = create_terraform() + ddl_command = create_ddl_command() + confluent_cli_command = create_confluent_cli_command() + + app = typer.Typer() + + app.add_typer(terraform_command, name="terraform") + app.add_typer(ddl_command, name="ddl") + app.add_typer(confluent_cli_command, name="cli") + + app() + + +if __name__ == "__main__": + main() diff --git a/pyflink/pyproject.toml b/pyflink/pyproject.toml new file mode 100644 index 0000000000..befa4ff5b9 --- /dev/null +++ b/pyflink/pyproject.toml @@ -0,0 +1,9 @@ +[project] +name = "pyflink" +version = "0.1.0" +description = "Add your description here" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "typer>=0.15.2", +] diff --git a/pyflink/sedonuts/cli/__init__.py b/pyflink/sedonuts/cli/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/pyflink/sedonuts/cli/confluent/__init__.py b/pyflink/sedonuts/cli/confluent/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/pyflink/sedonuts/cli/confluent/functions.py b/pyflink/sedonuts/cli/confluent/functions.py new file mode 100644 index 0000000000..f104b5d201 --- /dev/null +++ b/pyflink/sedonuts/cli/confluent/functions.py @@ -0,0 +1,27 @@ +import os.path +import zipfile + + +def download_jar(sedona_version, scala_version) -> str: + pass + + +def list_classes_in_jar(jar_file_path): + with zipfile.ZipFile(jar_file_path, 'r') as jar: + jar_contents = jar.namelist() + + class_files = [ + file for file in jar_contents + if file.endswith('.class') and "org/apache/sedona/flink/confluent/constructors" in file and "ST_" in file + ] + + return class_files + + +def list_functions(sedona_version, scala_version, path: str | None = None): + if path is None: + path = download_jar(sedona_version, scala_version) + + path = os.path.join(path, f"sedona-flink-shaded_{scala_version}-{sedona_version}-SNAPSHOT.jar") + + return list_classes_in_jar(path) diff --git a/pyflink/sedonuts/cli/confluent/generate_ddl.py b/pyflink/sedonuts/cli/confluent/generate_ddl.py new file mode 100644 index 0000000000..f811cdbd89 --- /dev/null +++ b/pyflink/sedonuts/cli/confluent/generate_ddl.py @@ -0,0 +1,43 @@ +import typer + +from sedonuts.cli.confluent.functions import list_functions +from sedonuts.cli.confluent.template import function_template + + +def generate_ddl( + file: str = typer.Option(None, "--file", "-f", help="Path to the Terraform configuration file"), + artifact_id: str = typer.Option(..., "--artifact-id", "-a", help="Artifact ID of the JAR file") +): + files = list_functions( + "1.8.0", + "2.12", + "/Users/pawelkocinski/Desktop/projects/sed/sedona/flink-shaded/target" + ) + + templates = [] + + for f in files: + tail = f.split("/")[-1].replace(".class", "") + class_name = tail.split("$")[1] + location = tail.replace("$", ".") + # class_name, location, + templates.append(function_template.format(artifact_id)) + + if file is None: + for template in templates: + print(template) + print("") + + return + + with open(file, "w") as f: + for template in templates: + f.write(template) + f.write("\n\n") + + +def create_ddl_command(): + ddl_command = typer.Typer(name="ddl") + ddl_command.command(name="generate")(generate_ddl) + + return ddl_command diff --git a/pyflink/sedonuts/cli/confluent/generate_terraform.py b/pyflink/sedonuts/cli/confluent/generate_terraform.py new file mode 100644 index 0000000000..5d131997f3 --- /dev/null +++ b/pyflink/sedonuts/cli/confluent/generate_terraform.py @@ -0,0 +1,20 @@ +import typer + + +def generate( + file: str = typer.Option(None, "--file", "-f", help="Path to the Terraform configuration file"), +): + if file is None: + pass + + + + +def create_terraform(): + terraform_app = typer.Typer(name="terraform") + terraform_app.command(name="generate")( + generate + ) + + + return terraform_app diff --git a/pyflink/sedonuts/cli/confluent/insert_with_cli.py b/pyflink/sedonuts/cli/confluent/insert_with_cli.py new file mode 100644 index 0000000000..2a061a55ac --- /dev/null +++ b/pyflink/sedonuts/cli/confluent/insert_with_cli.py @@ -0,0 +1,114 @@ +import json +import time +from dataclasses import dataclass + +import typer + +from sedonuts.cli.confluent.functions import list_functions +from sedonuts.cli.confluent.template import function_template +import concurrent.futures + +import subprocess + + +@dataclass +class FlinkSQLStatement: + name: str + status: str + + +def describe_flink_sql_statement(name: str, environment: str): + # The command to run + command = [ + 'confluent', 'flink', 'statement', 'describe', + name, "--environment", environment, "--output", "json", + "--cloud", "aws", "--region", "us-east-1" + ] + + # Run the command using subprocess.run + result = subprocess.run(command, capture_output=True, text=True) + + # Check the result and print output or error + if result.returncode == 0: + result = result.stdout + metadata = json.loads(result) + + return FlinkSQLStatement( + name=metadata["name"], + status=metadata["status"], + ) + + else: + print("Command failed:") + print(result.stderr) + + +def run_flink_sql_statement(sql: str, compute_pool: str, database: str, environment: str, function_name: str): + print(f"Creating function for {function_name}") + command = [ + 'confluent', 'flink', 'statement', 'create', '--sql', + sql, '--compute-pool', compute_pool, '--database', database, + "--environment", environment, "--output", "json" + ] + + result = subprocess.run(command, capture_output=True, text=True) + + if result.returncode == 0: + result = result.stdout + metadata = json.loads(result) + + name = metadata["name"] + + metadata_update = describe_flink_sql_statement(name, environment) + + while metadata_update.status == "RUNNING" or metadata_update.status == "PENDING": + time.sleep(2) + metadata_update = describe_flink_sql_statement(name, environment) + + if metadata_update.status == "FAILED": + print(f"Command failed for {function_name}") + return + + if metadata_update.status == "COMPLETED": + print(f"Command succeeded for {function_name}") + return + + else: + print("Command failed:") + print(result.stderr.strip()) # The standard error output + print("") + + +def apply( + file: str = typer.Option(None, "--file", "-f", help="Path to the Terraform configuration file"), + artifact_id: str = typer.Option(..., "--artifact-id", "-a", help="Artifact ID of the JAR file"), + database: str = typer.Option(..., "--database", "-d", help="Database name"), + compute_pool: str = typer.Option(..., "--compute-pool", "-c", help="Compute pool name"), + environment: str = typer.Option(..., "--environment", "-e", help="Environment name"), +): + files = list_functions( + "1.8.0", + "2.12", + "/Users/pawelkocinski/Desktop/projects/sed/sedona/flink-shaded/target" + ) + + tasks = [] + for f in files: + class_name = f.split("/")[-1].replace(".class", "") + + sql = function_template.format(class_name, class_name, artifact_id) + + tasks.append([sql, class_name]) + + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + task_pool = [executor.submit(run_flink_sql_statement, sql, compute_pool, database, environment, class_name) for sql, class_name in tasks] + + for future in concurrent.futures.as_completed(task_pool): + task_result = future.result() + + +def create_confluent_cli_command(): + cli_command = typer.Typer(name="cli") + cli_command.command(name="apply")(apply) + + return cli_command diff --git a/pyflink/sedonuts/cli/confluent/template.py b/pyflink/sedonuts/cli/confluent/template.py new file mode 100644 index 0000000000..985d7f9a9e --- /dev/null +++ b/pyflink/sedonuts/cli/confluent/template.py @@ -0,0 +1,3 @@ +function_template = """ +CREATE FUNCTION {} AS 'org.apache.sedona.flink.confluent.constructors.{}' USING JAR 'confluent-artifact://{}'; +""".strip()
