This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ee53ab6 improve datagenerator source (#3203) ee53ab6 is described below commit ee53ab6f21299dad0f19ab42caa94a7e9c336167 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon Dec 17 09:01:06 2018 -0800 improve datagenerator source (#3203) * improve datagenerator source * cleaning up --- pulsar-io/data-genenator/pom.xml | 5 ++ ...atorSource.java => DataGeneratorPrintSink.java} | 30 ++----- .../io/datagenerator/DataGeneratorSource.java | 4 +- .../org/apache/pulsar/io/datagenerator/Person.java | 96 ++++++++++++++++++++++ .../resources/META-INF/services/pulsar-io.yaml | 1 + 5 files changed, 111 insertions(+), 25 deletions(-) diff --git a/pulsar-io/data-genenator/pom.xml b/pulsar-io/data-genenator/pom.xml index 8e9f7ad..6a6b72a 100644 --- a/pulsar-io/data-genenator/pom.xml +++ b/pulsar-io/data-genenator/pom.xml @@ -43,6 +43,11 @@ <artifactId>jfairy</artifactId> <version>0.5.9</version> </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> </dependencies> diff --git a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java similarity index 57% copy from pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java copy to pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java index 6087747..6944247 100644 --- a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java +++ b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorPrintSink.java @@ -18,38 +18,24 @@ */ package org.apache.pulsar.io.datagenerator; -import io.codearte.jfairy.Fairy; -import io.codearte.jfairy.producer.person.Person; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.Source; -import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; import java.util.Map; -import java.util.Optional; - -public class DataGeneratorSource implements Source<Person> { +@Slf4j +public class DataGeneratorPrintSink implements Sink<Person> { @Override - public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { } @Override - public Record<Person> read() throws Exception { - Thread.sleep(50); - Fairy fairy = Fairy.create(); - return new Record<Person>() { - @Override - public Optional<String> getKey() { - return Optional.empty(); - } - - @Override - public Person getValue() { - return fairy.person(); - } - }; + public void write(Record<Person> record) throws Exception { + log.info("RECV: {}", record.getValue()); } @Override diff --git a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java index 6087747..1a9f63f 100644 --- a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java +++ b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/DataGeneratorSource.java @@ -19,7 +19,6 @@ package org.apache.pulsar.io.datagenerator; import io.codearte.jfairy.Fairy; -import io.codearte.jfairy.producer.person.Person; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.Source; import org.apache.pulsar.io.core.SourceContext; @@ -38,7 +37,6 @@ public class DataGeneratorSource implements Source<Person> { @Override public Record<Person> read() throws Exception { Thread.sleep(50); - Fairy fairy = Fairy.create(); return new Record<Person>() { @Override public Optional<String> getKey() { @@ -47,7 +45,7 @@ public class DataGeneratorSource implements Source<Person> { @Override public Person getValue() { - return fairy.person(); + return new Person(Fairy.create().person()); } }; } diff --git a/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java new file mode 100644 index 0000000..c822d31 --- /dev/null +++ b/pulsar-io/data-genenator/src/main/java/org/apache/pulsar/io/datagenerator/Person.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.datagenerator; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +/** + * This class serves as a copy of of io.codearte.jfairy.producer.person.Person + * because io.codearte.jfairy.producer.person.Person does not + * have default constructors needed to deserialize POJOs + */ +public class Person { + private Address address; + private String firstName; + private String middleName; + private String lastName; + private String email; + private String username; + private String password; + private Sex sex; + private String telephoneNumber; + @org.apache.avro.reflect.AvroSchema("{ \"type\": \"long\", \"logicalType\": \"timestamp-millis\" }") + private long dateOfBirth; + private Integer age; + private Company company; + private String companyEmail; + private String nationalIdentityCardNumber; + private String nationalIdentificationNumber; + private String passportNumber; + + public enum Sex { + MALE, + FEMALE; + + private Sex() { + } + } + + public Person(io.codearte.jfairy.producer.person.Person person) { + this(new Address(person.getAddress()), person.getFirstName(), person.getMiddleName(), person.getLastName(), + person.getEmail(), person.getUsername(), person.getPassword(), Sex.valueOf(person.getSex().name()), + person.getTelephoneNumber(), person.getDateOfBirth().getMillis(), + person.getAge(), new Company(person.getCompany()), person.getCompanyEmail(), + person.getNationalIdentityCardNumber(), person.getNationalIdentificationNumber(), + person.getPassportNumber()); + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class Company { + private String name; + private String domain; + private String email; + private String vatIdentificationNumber; + public Company(io.codearte.jfairy.producer.company.Company company) { + this(company.getName(), company.getDomain(), company.getEmail(), company.getVatIdentificationNumber()); + } + } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class Address { + protected String street; + protected String streetNumber; + protected String apartmentNumber; + protected String postalCode; + protected String city; + + public Address(io.codearte.jfairy.producer.person.Address address) { + this(address.getStreet(), address.getStreetNumber(), address.getApartmentNumber(), address.getPostalCode(), address.getCity()); + } + } +} diff --git a/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml index d0fd7a6..0e5d723 100644 --- a/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/data-genenator/src/main/resources/META-INF/services/pulsar-io.yaml @@ -20,3 +20,4 @@ name: data-generator description: Test data generator source sourceClass: org.apache.pulsar.io.datagenerator.DataGeneratorSource +sinkClass: org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink