sijie closed pull request #2253: Add Test Specific Examples URL: https://github.com/apache/incubator-pulsar/pull/2253
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java new file mode 100644 index 0000000000..f433372b60 --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/ByteBufferSerDe.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.SerDe; + +import java.nio.ByteBuffer; + +/** + * Simple ByteBuffer Serializer and Deserializer + */ +public class ByteBufferSerDe implements SerDe<Integer> { + @Override + public Integer deserialize(byte[] bytes) { + return ByteBuffer.wrap(bytes).getInt(); + } + + @Override + public byte[] serialize(Integer integer) { + return ByteBuffer.allocate(4).putInt(integer).array(); + } +} + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java new file mode 100644 index 0000000000..da696fdc42 --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CommaWindowFunction.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import java.util.Collection; + +/** + * Comma based aggregation window function example + */ +public class CommaWindowFunction implements java.util.function.Function<Collection<String>, String> { + @Override + public String apply(Collection<String> integers) { + return String.join(",", integers); + } +} diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseObject.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseObject.java new file mode 100644 index 0000000000..173393b62a --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseObject.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +@AllArgsConstructor +public class CustomBaseObject { + private long baseValue; +} + + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java new file mode 100644 index 0000000000..b8a6a5714c --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseSerde.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.SerDe; + +import java.nio.ByteBuffer; + +/** + * Example of using a byte buffer serialization for Custom object + */ +public class CustomBaseSerde implements SerDe<CustomBaseObject> { + @Override + public CustomBaseObject deserialize(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + return new CustomBaseObject(buffer.getLong()); + } + + @Override + public byte[] serialize(CustomBaseObject object) { + ByteBuffer buffer = ByteBuffer.allocate(8); + buffer.putLong(object.getBaseValue()); + return buffer.array(); + } +} + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java new file mode 100644 index 0000000000..d3b9225a2b --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToBaseFunction.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + +/** + * Function example of processing on a custom object type + */ +public class CustomBaseToBaseFunction implements Function<CustomBaseObject, CustomBaseObject> { + + @Override + public CustomBaseObject process(CustomBaseObject input, Context context) { + return new CustomBaseObject(input.getBaseValue() + 100); + } +} + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java new file mode 100644 index 0000000000..9e8cab52e1 --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomBaseToDerivedFunction.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + +/** + * Examplf of fucntion doing a type object conversion between input ann output type + */ +public class CustomBaseToDerivedFunction implements Function<CustomBaseObject, CustomDerivedObject> { + + @Override + public CustomDerivedObject process(CustomBaseObject input, Context context) { + return new CustomDerivedObject(input.getBaseValue() + 100, (int)input.getBaseValue() + 50); + } +} + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedObject.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedObject.java new file mode 100644 index 0000000000..464e977be8 --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedObject.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class CustomDerivedObject extends CustomBaseObject { + private int derivedValue; + public CustomDerivedObject(long baseValue, int derivedValue) { + super(baseValue); + this.derivedValue = derivedValue; + } +} + + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java new file mode 100644 index 0000000000..219cdadcdc --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedSerde.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.SerDe; + +import java.nio.ByteBuffer; + +/** + * Example to derived object serialization + */ +public class CustomDerivedSerde implements SerDe<CustomDerivedObject> { + @Override + public CustomDerivedObject deserialize(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + return new CustomDerivedObject(buffer.getLong(), buffer.getInt()); + } + + @Override + public byte[] serialize(CustomDerivedObject object) { + ByteBuffer buffer = ByteBuffer.allocate(12); + buffer.putLong(object.getBaseValue()); + buffer.putInt(object.getDerivedValue()); + return buffer.array(); + } +} + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToBaseFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToBaseFunction.java new file mode 100644 index 0000000000..8bcb0d028d --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToBaseFunction.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + +/** + * Example of function converting a derived object to a base object. + */ +public class CustomDerivedToBaseFunction implements Function<CustomDerivedObject, CustomBaseObject> { + + @Override + public CustomBaseObject process(CustomDerivedObject input, Context context) { + return new CustomBaseObject(input.getBaseValue() + 101); + } +} + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java new file mode 100644 index 0000000000..a86c82df3e --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/CustomDerivedToDerivedFunction.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + +/** + * Example of 2nd order conversion from a base object for composition pipelines + */ +public class CustomDerivedToDerivedFunction implements Function<CustomDerivedObject, CustomDerivedObject> { + + @Override + public CustomDerivedObject process(CustomDerivedObject input, Context context) { + return new CustomDerivedObject(input.getBaseValue() + 101, input.getDerivedValue() + 150); + } +} + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java new file mode 100644 index 0000000000..add14d474a --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserConfigFunction.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + +import java.util.Optional; + +/** + * An example demonstrate retrieving user config value from Context + */ +public class UserConfigFunction implements Function<String, String> { + + @Override + public String process(String input, Context context) { + Optional<Object> whatToWrite = context.getUserConfigValue("WhatToWrite"); + if (whatToWrite.get() != null) { + return (String)whatToWrite.get(); + } else { + return "Not a nice way"; + } + } +} + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserExceptionFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserExceptionFunction.java new file mode 100644 index 0000000000..2979d22f02 --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserExceptionFunction.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; + + +/** + * This Function simulates a pulsar function encountering runtime errors. + */ +public class UserExceptionFunction implements Function<String, String> { + @Override + public String process(String input, Context context) { + throw new RuntimeException("This wont work"); + } +} + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java new file mode 100644 index 0000000000..1d31c476ee --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/UserPublishFunction.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import org.apache.pulsar.functions.api.Context; +import org.apache.pulsar.functions.api.Function; +import org.apache.pulsar.functions.api.utils.DefaultSerDe; + +import java.util.Optional; + +/** + * An example demonstrate publishing messages through Context + */ +public class UserPublishFunction implements Function<String, Void> { + + @Override + public Void process(String input, Context context) { + Optional<Object> topicToWrite = context.getUserConfigValue("topic"); + if (topicToWrite.get() != null) { + context.publish((String)topicToWrite.get(), input, DefaultSerDe.class.getName()); + } + return null; + } +} + diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java new file mode 100644 index 0000000000..cb0aa9f1bf --- /dev/null +++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/WindowDurationFunction.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.api.examples; + +import java.util.Collection; + +/** + * This functions collects the timestamp during the window operation + */ +public class WindowDurationFunction implements java.util.function.Function<Collection<String>, String> { + @Override + public String apply(Collection<String> integers) { + long time = System.currentTimeMillis(); + return String.format("%s:%s", String.join(",", integers), time); + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
