This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch swift-sdk in repository https://gitbox.apache.org/repos/asf/beam.git
commit cf29f48a462fb26962e5cb282c821718fdef5ae4 Author: Byron Ellis <[email protected]> AuthorDate: Wed Aug 9 17:52:28 2023 -0700 Moving experimental Swift SDK code into Beam repo structure. Cleaning up and adding tests and Apache License labels where needed. --- sdks/swift/Documentation/INTERNALS.md | 4 + sdks/swift/Package.resolved | 113 ++++++++++++ sdks/swift/Package.swift | 62 +++++++ sdks/swift/Scripts/generate_protobuf.sh | 97 +++++++++++ sdks/swift/Sources/ApacheBeam/ApacheBeam.swift | 20 +++ .../ApacheBeam/Client/ApiServiceDescriptor.swift | 63 +++++++ .../Sources/ApacheBeam/Coders/BeamValue.swift | 70 ++++++++ .../Sources/ApacheBeam/Coders/Coder+Decoding.swift | 79 +++++++++ .../Sources/ApacheBeam/Coders/Coder+Encoding.swift | 109 ++++++++++++ sdks/swift/Sources/ApacheBeam/Coders/Coder.swift | 190 +++++++++++++++++++++ .../Sources/ApacheBeam/Core/ArtifactInfo.swift | 31 ++++ .../Sources/ApacheBeam/Core/Environment.swift | 44 +++++ sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift | 62 +++++++ sdks/swift/Sources/ApacheBeam/Core/Windowing.swift | 55 ++++++ .../ApacheBeam/Internal/Data+Decoding.swift | 72 ++++++++ .../ApacheBeam/Internal/Data+Encoding.swift | 35 ++++ .../ApacheBeam/Internal/Date+Timestamp.swift | 27 +++ .../ApacheBeam/Internal/ProtoConversion.swift | 25 +++ .../Sources/ApacheBeam/Internal/String+Urns.swift | 38 +++++ .../Tests/ApacheBeamTests/ApacheBeamTests.swift | 12 ++ .../Tests/ApacheBeamTests/Coders/CoderTests.swift | 62 +++++++ 21 files changed, 1270 insertions(+) diff --git a/sdks/swift/Documentation/INTERNALS.md b/sdks/swift/Documentation/INTERNALS.md new file mode 100644 index 00000000000..757e1b44602 --- /dev/null +++ b/sdks/swift/Documentation/INTERNALS.md @@ -0,0 +1,4 @@ +# Internals + + + diff --git a/sdks/swift/Package.resolved b/sdks/swift/Package.resolved new file mode 100644 index 00000000000..cc4a106ff4a --- /dev/null +++ b/sdks/swift/Package.resolved @@ -0,0 +1,113 @@ +{ + "pins" : [ + { + "identity" : "grpc-swift", + "kind" : "remoteSourceControl", + "location" : "https://github.com/grpc/grpc-swift.git", + "state" : { + "revision" : "735d88f8196467f24189896f29c4625af7cbd072", + "version" : "1.19.0" + } + }, + { + "identity" : "swift-atomics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-atomics.git", + "state" : { + "revision" : "6c89474e62719ddcc1e9614989fff2f68208fe10", + "version" : "1.1.0" + } + }, + { + "identity" : "swift-collections", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-collections.git", + "state" : { + "revision" : "937e904258d22af6e447a0b72c0bc67583ef64a2", + "version" : "1.0.4" + } + }, + { + "identity" : "swift-docc-plugin", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-docc-plugin", + "state" : { + "revision" : "26ac5758409154cc448d7ab82389c520fa8a8247", + "version" : "1.3.0" + } + }, + { + "identity" : "swift-docc-symbolkit", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-docc-symbolkit", + "state" : { + "revision" : "b45d1f2ed151d057b54504d653e0da5552844e34", + "version" : "1.0.0" + } + }, + { + "identity" : "swift-log", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-log.git", + "state" : { + "revision" : "32e8d724467f8fe623624570367e3d50c5638e46", + "version" : "1.5.2" + } + }, + { + "identity" : "swift-nio", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio.git", + "state" : { + "revision" : "cf281631ff10ec6111f2761052aa81896a83a007", + "version" : "2.58.0" + } + }, + { + "identity" : "swift-nio-extras", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-extras.git", + "state" : { + "revision" : "0e0d0aab665ff1a0659ce75ac003081f2b1c8997", + "version" : "1.19.0" + } + }, + { + "identity" : "swift-nio-http2", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-http2.git", + "state" : { + "revision" : "a8ccf13fa62775277a5d56844878c828bbb3be1a", + "version" : "1.27.0" + } + }, + { + "identity" : "swift-nio-ssl", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-ssl.git", + "state" : { + "revision" : "e866a626e105042a6a72a870c88b4c531ba05f83", + "version" : "2.24.0" + } + }, + { + "identity" : "swift-nio-transport-services", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-transport-services.git", + "state" : { + "revision" : "5fd1458c245d5741b3c8ebe55489f590c6ca8f15", + "version" : "1.18.0" + } + }, + { + "identity" : "swift-protobuf", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-protobuf.git", + "state" : { + "revision" : "ce20dc083ee485524b802669890291c0d8090170", + "version" : "1.22.1" + } + } + ], + "version" : 2 +} diff --git a/sdks/swift/Package.swift b/sdks/swift/Package.swift new file mode 100644 index 00000000000..e6b67948631 --- /dev/null +++ b/sdks/swift/Package.swift @@ -0,0 +1,62 @@ +// swift-tools-version: 5.9 +// The swift-tools-version declares the minimum version of Swift required to build this package. + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import PackageDescription + +let dependencies: [Package.Dependency] = [ + // Core Dependencies + .package(url: "https://github.com/grpc/grpc-swift.git", from: "1.19.0"), + .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"), + + // Additional Transform Dependencies + + + // Swift Package Manager Plugins + .package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0") +] + +let package = Package( + name: "ApacheBeam", + platforms: [ + .macOS("13.0") + ], + products: [ + // Products define the executables and libraries a package produces, making them visible to other packages. + .library( + name: "ApacheBeam", + targets: ["ApacheBeam"]), + ], + dependencies: dependencies, + targets: [ + // Targets are the basic building blocks of a package, defining a module or a test suite. + // Targets can depend on other targets in this package and products from dependencies. + .target( + name: "ApacheBeam", + dependencies: [ + .product(name:"GRPC",package:"grpc-swift"), + .product(name: "Logging",package:"swift-log") + ] + ), + .testTarget( + name: "ApacheBeamTests", + dependencies: ["ApacheBeam"]), + ] +) diff --git a/sdks/swift/Scripts/generate_protobuf.sh b/sdks/swift/Scripts/generate_protobuf.sh new file mode 100755 index 00000000000..8210868a804 --- /dev/null +++ b/sdks/swift/Scripts/generate_protobuf.sh @@ -0,0 +1,97 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +set -e + +read -r -d '\0' LICENSE << EOM +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +\0 +EOM + +if [[ -z "$(which protoc)" ]]; then + echo "protoc not found on path" + exit 1 +fi + +SCRIPT_DIR="$( realpath "$( dirname "${BASH_SOURCE[0]}" )" )" +if [[ -z $SCRIPT_DIR ]]; then + echo "unable to resolve path to script" + exit 1 +fi + +SDK_PATH="$( realpath "$(dirname $SCRIPT_DIR)/.." )" +if [[ -z "$SDK_PATH" ]]; then + echo "unable to resolve path to project root" + exit 1 +fi + +PROJECT_ROOT="$(realpath "$(dirname $SCRIPT_DIR)/../..")" +if [[ -z "$PROJECT_ROOT" ]]; then + echo "unable to resolve path to project root" + exit 1 +fi + +# You can manually specify this otherwise we assume we are in the repo +if [[ -z "$BEAM_PATH" ]]; then + BEAM_PATH=${PROJECT_ROOT} +fi + +function gen_beam_model_protos() { + cd "$PROJECT_ROOT" + echo "Generating Beam Model Protos in $PROJECT_ROOT from $BEAM_PATH" + + declare -a TO_GEN=( + '/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/*.proto' + '/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/*.proto' + '/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/*.proto' + ) + + declare -a INCLUDES=() + for package in "${TO_GEN[@]}" + do + INCLUDES+=("-I${BEAM_PATH}/${package%/org*}") + done + + mkdir -p $PROJECT_ROOT/sdks/swift/Sources/ApacheBeam/Generated/{Model,GRPC} + for package in "${TO_GEN[@]}" + do + echo "${INCLUDES[@]}" + protoc \ + "${INCLUDES[@]}" \ + --swift_out="$PROJECT_ROOT/sdks/swift/Sources/ApacheBeam/Generated/Model" \ + --grpc-swift_out="$PROJECT_ROOT/sdks/swift/Sources/ApacheBeam/Generated/GRPC" \ + ${BEAM_PATH}$package + done +} + +gen_beam_model_protos diff --git a/sdks/swift/Sources/ApacheBeam/ApacheBeam.swift b/sdks/swift/Sources/ApacheBeam/ApacheBeam.swift new file mode 100644 index 00000000000..ab09c722187 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/ApacheBeam.swift @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +enum ApacheBeamError : Error { + case runtimeError(String) +} diff --git a/sdks/swift/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift b/sdks/swift/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift new file mode 100644 index 00000000000..40e1b662d8f --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Client/ApiServiceDescriptor.swift @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/// Representation of the API Service Descriptors used to communicate with runners (and vice versa) +public struct ApiServiceDescriptor { + + public enum EncodedAs { + case json,textproto + } + + let url: String + + public init(host:String,port:Int) { + self.url = "\(host):\(port)" + } + public init(unixAddress:String) { + self.url = "unix://\(unixAddress)" + } +} + +extension ApiServiceDescriptor { + init(proto: Org_Apache_Beam_Model_Pipeline_V1_ApiServiceDescriptor) { + self.url = proto.url + } +} + +extension ApiServiceDescriptor : ProtoConversion { + + typealias Proto = Org_Apache_Beam_Model_Pipeline_V1_ApiServiceDescriptor + var proto: Org_Apache_Beam_Model_Pipeline_V1_ApiServiceDescriptor { + .with { $0.url = self.url } + } + + +} + + + +public extension ApiServiceDescriptor { + static func from(env:String,format:EncodedAs = .textproto) throws -> ApiServiceDescriptor { + switch format { + case .textproto: + ApiServiceDescriptor(proto: try .init(textFormatString: env)) + case .json: + ApiServiceDescriptor(proto: try .init(jsonString: env)) + } + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift b/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift new file mode 100644 index 00000000000..054356ed6f1 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Coders/BeamValue.swift @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation + +/// An enum representing values coming over the FnApi Data Plane. +public indirect enum BeamValue { + /// A value not representable in the Swift SDK + case invalid(String) + + // Scalar values + + /// Bytes coded + case bytes(Data?) + /// UTF8 Strings + case string(String?) + /// Integers (Signed 64-bit) + case integer(Int?) + /// Doubles + case double(Double?) + /// Booleans + case boolean(Bool?) + /// A window + case window(Window) + + // Composite Values + + /// An iterable + case array([BeamValue]) + /// A key-value pair + case kv(BeamValue,BeamValue) + /// A windowed value + case windowed(BeamValue,Date,UInt8,BeamValue) + + /// Convenience method for extacting the base value from one + /// of the scalar representations. + var baseValue : Any? { + switch self { + case let .bytes(d): + d + case let .string(s): + s + case let .integer(i): + i + case let .double(d): + d + case let .boolean(b): + b + case let .window(w): + w + default: + nil as Any? + } + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift new file mode 100644 index 00000000000..48c1cb9db43 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Decoding.swift @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation + +public extension Coder { + + /// Decodes a raw data block into a BeamValue for further processing + func decode(_ data:inout Data) throws -> BeamValue { + switch self { + // Scalar values check for size 0 input data and return null if that's a problem + + // TODO: Endian and other encoding checks + + case .bytes: + return .bytes(data.count == 0 ? nil : try data.subdata()) + case .string: + return .string(data.count == 0 ? nil : String(data: try data.subdata(), encoding: .utf8)) + case .varint: + return .integer(data.count == 0 ? nil : try data.varint()) + case .fixedint: + return .integer(data.count == 0 ? nil : try data.next(Int.self)) + case .byte: + return .integer(data.count == 0 ? nil : Int(try data.next(UInt8.self))) + case .boolean: + return .boolean(data.count == 0 ? nil : try data.next(UInt8.self) != 0) + case .double: + return .double(data.count == 0 ? nil : try data.next(Double.self)) + case .globalwindow: + return .window(.global) + case .lengthprefix(let coder): // Length prefix basically serves to make values nullable + var subdata = try data.subdata() + return try coder.decode(&subdata) + case let .keyvalue(keyCoder, valueCoder): + return .kv(try keyCoder.decode(&data), try valueCoder.decode(&data)) + case let .iterable(coder): + let length = try data.next(Int32.self) + return .array(try (0..<length).map({ _ in try coder.decode(&data) })) + case let .windowedvalue(valueCoder, windowCoder): + let timestamp = try data.next(Int64.self).byteSwapped &+ Int64(-9223372036854775808) + let windowCount = try data.next(Int32.self).byteSwapped + if windowCount > 1 { + throw ApacheBeamError.runtimeError("Windowed values with > 1 window not yet supported") + } + let window = try windowCoder.decode(&data) + + //TODO: Actually handle pane info + let pane = try data.next(UInt8.self) + switch ((pane >> 4) & 0x0f) { + case 0x0: + break + case 0x1: + _ = try data.varint() + case 0x2: + _ = try data.varint() + _ = try data.varint() + default: + throw ApacheBeamError.runtimeError("Invalid pane encoding \(String(pane,radix:2))") + } + return .windowed(try valueCoder.decode(&data), Date(millisecondsSince1970: timestamp), pane, window) + default: + throw ApacheBeamError.runtimeError("Decoding of \(self.urn) coders not supported.") + } + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift new file mode 100644 index 00000000000..fffeb351a53 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder+Encoding.swift @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation + +public extension Coder { + + func encode(_ value: Any?) throws -> Data { + var data = Data() + try self.encode(value,data: &data) + return data + } + func encode(_ value: Any?,data: inout Data) throws { + switch self { + // Scalar values check for size 0 input data and return null if that's a problem + + // TODO: Endian and other encoding checks + + case .bytes: + if let v = value as? Data { + data.varint(v.count) + data.append(v) + } + case .string: + if let v = value as? String { + let d = Data(v.utf8) + data.varint(d.count) + data.append(d) + } + case .varint: + if let v = value as? Int { + data.varint(v) + } + case .fixedint: + if let v = value as? Int { + data.next(v) + } + case .byte: + if let v = value as? UInt8 { + data.next(v) + } + case .boolean: + if let v = value as? Bool { + let byte: UInt8 = v ? 1 : 0 + data.next(byte) + } + case .double: + if let v = value as? Double { + data.next(v) + } + case .globalwindow: + break + case .lengthprefix(let coder): + let subData = try coder.encode(value) + data.varint(subData.count) + data.append(subData) + case let .keyvalue(keyCoder, valueCoder): + if let v = value as? AnyKeyValue { + try keyCoder.encode(v.anyKey, data: &data) + // We do a special case check here to account for the fact that + // keyvalue is used both for group by as well as a pair type + switch valueCoder { + case .iterable(_): + try valueCoder.encode(v.anyValues,data:&data) + default: + try valueCoder.encode(v.anyValue,data:&data) + } + } + case let .iterable(coder): + if let v = value as? [Any] { + data.next(Int32(truncatingIfNeeded: v.count)) + for item in v { + try coder.encode(item, data: &data) + } + } + case let .windowedvalue(valueCoder, windowCoder): + if let (v,ts,w) = value as? (Any,Date,Window) { + //Timestamp + data.next( (ts.millisecondsSince1970 &- Int64(-9223372036854775808)).bigEndian ) + switch w { + case .global: + data.next(Int32(0)) + default: + data.next(Int32(1)) + try windowCoder.encode(w,data:&data) + } + // TODO: Real Panes + data.append(UInt8(1 >> 5 | 1 >> 6 | 1 >> 7)) + try valueCoder.encode(v, data: &data) + } + default: + throw ApacheBeamError.runtimeError("Encoding of \(self.urn) coders not supported.") + } + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift new file mode 100644 index 00000000000..e8c23b9d4cf --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Coders/Coder.swift @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation + +public indirect enum Coder { + // Special + case unknown(String) + case custom(Data) + + // Scalar standard coders + case double,varint,fixedint,byte,bytes,string,boolean,globalwindow + + // Composite standard coders + + case keyvalue(Coder,Coder) + case iterable(Coder) + case lengthprefix(Coder) + case windowedvalue(Coder,Coder) + + // TODO: Row Coder +} + + +public extension Coder { + + var urn: String { + switch self { + case let .unknown(name): + return .coderUrn(name) + case .custom(_): + return .coderUrn("custom") + case .double: + return .coderUrn("double") + case .varint: + return .coderUrn("varint") + case .fixedint: + return .coderUrn("integer") + case .bytes: + return .coderUrn("bytes") + case .byte: + return .coderUrn("byte") + case .string: + return .coderUrn("string_utf8") + case .boolean: + return .coderUrn("bool") + case .globalwindow: + return .coderUrn("global_window") + case .keyvalue(_, _): + return .coderUrn("kv") + case .iterable(_): + return .coderUrn("iterable") + case .lengthprefix(_): + return .coderUrn("length_prefix") + case .windowedvalue(_, _): + return .coderUrn("windowed_value") + } + } + + static let capabilities:[String] = ["byte","bytes","bool","varint","double","integer","string_utf8","length_prefix","kv","iterable","windowed_value","global_window"] + .map({ .coderUrn($0) }) +} + +extension Coder : Hashable { + public func hash(into hasher: inout Hasher) { + hasher.combine(self.urn) + switch self { + case let .keyvalue(k,v): + k.hash(into:&hasher) + v.hash(into:&hasher) + case let .iterable(c): + c.hash(into:&hasher) + case let .lengthprefix(c): + c.hash(into:&hasher) + case let .windowedvalue(v, w): + v.hash(into:&hasher) + w.hash(into:&hasher) + default: + break + } + } +} + +extension Coder : Equatable { + public static func ==(_ lhs: Coder,_ rhs: Coder) -> Bool { + switch (lhs,rhs) { + case let (.unknown(a),.unknown(b)): + return a == b + case let (.custom(a),.custom(b)): + return a == b + case (.double,.double): + return true + case (.varint,.varint): + return true + case (.fixedint,.fixedint): + return true + case (.bytes,.bytes): + return true + case (.byte,.byte): + return true + case (.string,.string): + return true + case (.boolean,.boolean): + return true + case (.globalwindow,.globalwindow): + return true + case let (.keyvalue(lk,lv),.keyvalue(rk, rv)): + return lk == rk && lv == rv + case let (.iterable(a),.iterable(b)): + return a == b + case let (.lengthprefix(a),.lengthprefix(b)): + return a == b + case let (.windowedvalue(lv, lw),.windowedvalue(rv,rw)): + return lv == rv && lw == rw + default: + return false + } + } +} + + + +protocol CoderContainer { + subscript(name:String) -> CoderProto? { get } +} + +struct PipelineCoderContainer : CoderContainer { + let pipeline: PipelineProto + subscript(name: String) -> CoderProto? { + pipeline.components.coders[name] + } +} + +struct BundleCoderContainer : CoderContainer { + let bundle : ProcessBundleDescriptorProto + subscript(name: String) -> CoderProto? { + bundle.coders[name] + } +} + +extension Coder { + static func of(name:String,in container:CoderContainer) -> Coder? { + if let baseCoder = container[name] { + switch baseCoder.spec.urn { + case "beam:coder:bytes:v1": + return .bytes + case "beam:coder:varint:v1": + return .varint + case "beam:coder:string_utf8:v1": + return .string + case "beam:coder:double:v1": + return .double + case "beam:coder:iterable:v1": + return .iterable(.of(name: baseCoder.componentCoderIds[0], in: container)!) + case "beam:coder:kv:v1": + return .keyvalue( + .of(name: baseCoder.componentCoderIds[0],in:container)!, + .of(name: baseCoder.componentCoderIds[1], in:container)!) + case "beam:coder:global_window:v1": + return .globalwindow + case "beam:coder:windowed_value:v1": + return .windowedvalue( + .of(name: baseCoder.componentCoderIds[0],in:container)!, + .of(name: baseCoder.componentCoderIds[1],in:container)!) + case "beam:coder:length_prefix:v1": + return .lengthprefix(.of(name: baseCoder.componentCoderIds[0],in:container)!) + default: + return .unknown(baseCoder.spec.urn) + } + } else { + return nil + } + } +} + + diff --git a/sdks/swift/Sources/ApacheBeam/Core/ArtifactInfo.swift b/sdks/swift/Sources/ApacheBeam/Core/ArtifactInfo.swift new file mode 100644 index 00000000000..73c6112d680 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/ArtifactInfo.swift @@ -0,0 +1,31 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +public struct ArtifactInfo { + let role: String + let type: String +} + +extension ArtifactInfo : ProtoConversion { + var proto: Org_Apache_Beam_Model_Pipeline_V1_ArtifactInformation { + .with { + $0.roleUrn = .beamUrn(role,type:"artifact:type") + $0.typeUrn = .beamUrn(type,type:"artifact:role") + } + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/Environment.swift b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift new file mode 100644 index 00000000000..9e45c4575c4 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/Environment.swift @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/// A struct that encodes the different types of available in Beam +public struct Environment { + + public enum Category { + case system + case process(String,String,String) + case docker(String) + case external(ApiServiceDescriptor) + } + + let category: Category + public init(_ category: Category = .system,capabilities:[String] = [],dependencies:[ArtifactInfo]) { + self.category = category + } +} + + +extension Environment : ProtoConversion { + typealias Proto = EnvironmentProto + + var proto: EnvironmentProto { + .with { p in + } + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift b/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift new file mode 100644 index 00000000000..f8cd8f735c0 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/KeyValue.swift @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public protocol AnyKeyValue { + var anyKey: Any { get } + var anyValues: [Any] { get } + var anyValue: Any? { get } +} + + +/// A structure representing an array of values grouped by key +public struct KV<Key,Value> : AnyKeyValue { + + public let key: Key + public let values: [Value] + public var value: Value? { get { values.first } } + + public init(_ key: Key,_ value: Value) { + self.key = key + self.values = [value] + } + + public init(_ key: Key,_ values: [Value]) { + self.key = key + self.values = values + } + + public init(beam value: BeamValue) throws { + switch value { + case .windowed(let v, _, _, _): + self = try KV<Key,Value>(beam:v) + break + case let .kv(k, v): + self.key = k.baseValue! as! Key + self.values = v.baseValue as! [Value] + default: + throw ApacheBeamError.runtimeError("KV can only accept kv or windowed kv types") + } + } + + + public var anyKey: Any { key } + public var anyValues: [Any] { values.map({$0 as Any}) } + public var anyValue: Any? { value } +} + + diff --git a/sdks/swift/Sources/ApacheBeam/Core/Windowing.swift b/sdks/swift/Sources/ApacheBeam/Core/Windowing.swift new file mode 100644 index 00000000000..5727b6c3741 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Core/Windowing.swift @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + + +public enum Window { + case global + case bounded(Date) + case interval(Date,Date) + + var maxTimestamp : Date { + get { + switch(self) { + case .global: return Date.distantFuture + case .bounded(let end): return end + case .interval(_, let end): return end + } + } + } +} + +public enum WindowingStrategy { + +} + +public enum Timing : String { + case early = "EARLY" + case onTime = "ON_TIME" + case late = "LATE" + case unknown = "UNKNOWN" +} + +public protocol PaneInfo { + var timing : Timing { get } + var index : UInt64 { get } + var onTimeIndex: UInt64 { get } + var first : Bool { get } + var last : Bool { get } +} diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift b/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift new file mode 100644 index 00000000000..706c2036cbf --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Internal/Data+Decoding.swift @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation + +extension Data { + /// Read a variable length integer from the current data + mutating func varint() throws -> Int { + var data = self + let result = try data.withUnsafeBytes { + try $0.baseAddress!.withMemoryRebound(to: UInt8.self, capacity: 4) { + var p = $0 + if(p.pointee & 0x80 == 0) { + data = data.advanced(by: 1) + return Int(UInt64(p.pointee)) + } + var value = UInt64(p.pointee & 0x7f) + var shift = UInt64(7) + var count = 1 + p = p.successor() + while true { + if shift > 63 { + throw ApacheBeamError.runtimeError("Malformed Varint. Too large.") + } + count += 1 + value |= UInt64(p.pointee & 0x7f) << shift + if(p.pointee & 0x80 == 0) { + data = data.advanced(by: count) + return Int(value) + } + p = p.successor() + shift += 7 + } + } + } + self = data + return result + } + + /// Read a length prefixed chunk of data + mutating func subdata() throws -> Data { + let length = try self.varint() + let result = self.subdata(in: 0..<length) + self = self.advanced(by: length) + return result + } + + /// Extract a fixed length value from the data + mutating func next<T>(_ type: T.Type) throws -> T { + let size = MemoryLayout<T>.size + let result = self.withUnsafeBytes { + $0.baseAddress!.withMemoryRebound(to: T.self, capacity: 1) { $0.pointee } + } + self = self.advanced(by: size) + return result + } + +} diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift b/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift new file mode 100644 index 00000000000..86007b21104 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Internal/Data+Encoding.swift @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation + +extension Data { + mutating func varint(_ value: Int) { + var current = UInt64(value) + while current >= 0x80 { + self.append(UInt8(truncatingIfNeeded: current)|0x80) + current >>= 7 + } + self.append(UInt8(current)) + } + + mutating func next<T>(_ value: T) { + Swift.withUnsafeBytes(of:value) { + self.append(contentsOf: $0) + } + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift b/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift new file mode 100644 index 00000000000..16eb658d050 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Internal/Date+Timestamp.swift @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation + +extension Date { + var millisecondsSince1970 : Int64 { + Int64((self.timeIntervalSince1970 * 1000.0).rounded()) + } + init(millisecondsSince1970: Int64) { + self = Date(timeIntervalSince1970: TimeInterval(millisecondsSince1970) / 1000) + } +} diff --git a/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift b/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift new file mode 100644 index 00000000000..2a50e950386 --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Internal/ProtoConversion.swift @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +protocol ProtoConversion { + associatedtype Proto + + var proto: Proto { get } +} + + diff --git a/sdks/swift/Sources/ApacheBeam/Internal/String+Urns.swift b/sdks/swift/Sources/ApacheBeam/Internal/String+Urns.swift new file mode 100644 index 00000000000..48fa8508f5d --- /dev/null +++ b/sdks/swift/Sources/ApacheBeam/Internal/String+Urns.swift @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public extension String { + static func beamUrn(_ name:String,type:String="beam",version:String="v1") -> String { + return "beam:\(type):\(name):\(version)" + } + + static func coderUrn(_ name:String,version:String="v1") -> String { + return .beamUrn(name,type:"coder",version:version) + } + + static func runnerUrn(_ name:String,version:String="v1") -> String { + return .beamUrn(name,type:"runner",version:version) + } + + static func doFnUrn(_ name:String,sdk:String="swiftsdk",version:String="v1") -> String { + return .beamUrn(name,type:"dofn:\(sdk)",version:version) + } + + static func transformUrn(_ name:String,version:String="v1") -> String { + return .beamUrn(name,type:"transform",version:version) + } +} diff --git a/sdks/swift/Tests/ApacheBeamTests/ApacheBeamTests.swift b/sdks/swift/Tests/ApacheBeamTests/ApacheBeamTests.swift new file mode 100644 index 00000000000..23f85941311 --- /dev/null +++ b/sdks/swift/Tests/ApacheBeamTests/ApacheBeamTests.swift @@ -0,0 +1,12 @@ +import XCTest +@testable import ApacheBeam + +final class ApacheBeamTests: XCTestCase { + func testExample() throws { + // XCTest Documentation + // https://developer.apple.com/documentation/xctest + + // Defining Test Cases and Test Methods + // https://developer.apple.com/documentation/xctest/defining_test_cases_and_test_methods + } +} diff --git a/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift b/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift new file mode 100644 index 00000000000..74031c8b545 --- /dev/null +++ b/sdks/swift/Tests/ApacheBeamTests/Coders/CoderTests.swift @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import Foundation + + +import XCTest +@testable import ApacheBeam + +final class CoderTests: XCTestCase { + func testDefaultImpulseDecode() throws { + var impulse = Data([0x7f,0xdf,0x3b,0x64,0x5a,0x1c,0xac,0x09,0x00,0x00,0x00,0x01,0x0f,0x00]) + let impulseCoder = Coder.windowedvalue(.bytes, .globalwindow) + + let value = try impulseCoder.decode(&impulse) + switch value { + case let .windowed(value, _, _, window): + let data = value.baseValue as! Data + XCTAssertTrue(data.count == 0) + + let w = window.baseValue as! Window + switch w { + case .global: + break + default: + throw ApacheBeamError.runtimeError("Expected window to be global not \(w)") + } + + default: + throw ApacheBeamError.runtimeError("Expecting a windowed value, got \(value)") + } + } + + func testWindowedValue() throws { + let coder = Coder.windowedvalue(.bytes, .globalwindow) + let timestamp = Date.now + var data = try coder.encode((Data(),timestamp,Window.global)) + XCTAssertEqual(data.count, 14) + let value = try coder.decode(&data) + switch value { + case let .windowed(_,ts,_,_): + XCTAssertTrue("\(timestamp)" == "\(ts)") + default: + throw ApacheBeamError.runtimeError("Expected a windowed value, got \(value)") + } + } +} +
