This is an automated email from the ASF dual-hosted git repository. jfeinauer pushed a commit to branch feature/calcite-adapter in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit df1a6dbfb8cd456ea424152f39cbf022325274c8 Author: Julian Feinauer <j.feina...@pragmaticminds.de> AuthorDate: Sun Dec 23 21:33:47 2018 +0100 [CALCITE] Running Version. --- integrations/apache-calcite/pom.xml | 63 ++++++- .../main/java/org/apache/plc4x/Plc4xBaseTable.java | 193 +++++++++++++++++++++ .../main/java/org/apache/plc4x/Plc4xSchema.java | 63 +++++-- .../java/org/apache/plc4x/Plc4xSchemaFactory.java | 33 +++- .../java/org/apache/plc4x/Plc4xStreamTable.java | 45 +++++ .../src/main/java/org/apache/plc4x/Plc4xTable.java | 88 +++------- .../java/org/apache/plc4x/DriverManagerTest.java | 69 +++++++- .../java/org/apache/plc4x/Plc4XBaseTableTest.java | 48 +++++ .../test/java/org/apache/plc4x/Plc4xTableTest.java | 24 --- .../apache-calcite/src/test/resources/example.yml | 4 +- .../apache-calcite/src/test/resources/logback.xml | 38 ++++ .../apache-calcite/src/test/resources/model.yml | 33 ++++ .../java/scraper/config/JobConfiguration.java | 2 +- 13 files changed, 588 insertions(+), 115 deletions(-) diff --git a/integrations/apache-calcite/pom.xml b/integrations/apache-calcite/pom.xml index 6688586..5038722 100644 --- a/integrations/apache-calcite/pom.xml +++ b/integrations/apache-calcite/pom.xml @@ -1,4 +1,23 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> + <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> @@ -18,6 +37,15 @@ <version>1.18.0-SNAPSHOT</version> </dependency> <dependency> + <groupId>org.apache.calcite</groupId> + <artifactId>calcite-linq4j</artifactId> + <version>1.18.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> <groupId>org.apache.plc4x</groupId> <artifactId>plc4j-scraper</artifactId> <version>0.3.0-SNAPSHOT</version> @@ -27,9 +55,42 @@ <groupId>org.apache.plc4x</groupId> <artifactId>plc4j-driver-simulated</artifactId> <version>0.3.0-SNAPSHOT</version> - <scope>test</scope> + <!--<scope>test</scope>--> </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <version>3.0.0</version> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>assemble-all</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <usedDependencies> + <usedDependency>org.apache.plc4x:plc4j-driver-simulated</usedDependency> + </usedDependencies> + </configuration> + </plugin> + </plugins> + </build> + </project> \ No newline at end of file diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xBaseTable.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xBaseTable.java new file mode 100644 index 0000000..52b306b --- /dev/null +++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xBaseTable.java @@ -0,0 +1,193 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.plc4x; + +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.AbstractEnumerable; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Enumerator; +import org.apache.calcite.rel.*; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.plc4x.java.scraper.config.JobConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public abstract class Plc4xBaseTable extends AbstractTable { + + private static final Logger logger = LoggerFactory.getLogger(Plc4xBaseTable.class); + + private final BlockingQueue<Plc4xSchema.Record> queue; + private final JobConfiguration conf; + private final long tableCutoff; + private Plc4xSchema.Record current; + private List<String> names; + + public Plc4xBaseTable(BlockingQueue<Plc4xSchema.Record> queue, JobConfiguration conf, long tableCutoff) { + this.tableCutoff = tableCutoff; + logger.info("Instantiating new PLC4X Table with configuration: {}", conf); + this.queue = queue; + this.conf = conf; + // Extract names + names = new ArrayList<>(conf.getFields().keySet()); + } + + @Override + public Statistic getStatistic() { + return new Statistic() { + + public Double getRowCount() { + return tableCutoff > 0 ? (double)tableCutoff : null; + } + + public boolean isKey(ImmutableBitSet columns) { + return false; + } + + public List<RelReferentialConstraint> getReferentialConstraints() { + return Collections.emptyList(); + } + + public List<RelCollation> getCollations() { + return Collections.singletonList(RelCollationImpl.of(new RelFieldCollation(0, RelFieldCollation.Direction.ASCENDING))); + } + + public RelDistribution getDistribution() { + return RelDistributionTraitDef.INSTANCE.getDefault(); + } + }; + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + // Create the table spec + // Block until the first result is in the queue + CompletableFuture<Plc4xSchema.Record> future = CompletableFuture.supplyAsync(new FirstElementFetcher(queue)); + Plc4xSchema.Record first; + try { + first = future.get(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Thread was interrupted!", e); + } catch (ExecutionException | TimeoutException e) { + throw new RuntimeException("Unable to fetch first record and infer arguments!"); + } + logger.info("Inferring types for Table '{}' based on values: {}", conf.getName(), first.values); + // Extract types + List<RelDataType> types = names.stream() + .map(n -> { + Object o = first.values.get(n); + logger.debug("Infer field '{}' as class '{}'", n, o.getClass()); + return typeFactory.createJavaType(o.getClass()); + }) + .collect(Collectors.toList()); + List<String> pre = new ArrayList<>(Arrays.asList("timestamp", "source")); + pre.addAll(names); + List<RelDataType> preTypes = Stream.of(Timestamp.class, String.class) + .map(typeFactory::createJavaType) + .collect(Collectors.toList()); + preTypes.addAll(types); + return typeFactory.createStructType(preTypes, pre); + } + + /** + * if tableCutoff is positive, then the row gets limited to that. + */ + public Enumerable<Object[]> scanInternal(DataContext root) { + return new AbstractEnumerable<Object[]>() { + @Override + public Enumerator<Object[]> enumerator() { + return new Enumerator<Object[]>() { + + private final AtomicLong counter = new AtomicLong(0); + + @Override + public Object[] current() { + List<Object> objects = new ArrayList<>(Arrays.asList(new Timestamp(current.timestamp.toEpochMilli()), current.source)); + List<Object> objects2 = names.stream().map(name -> current.values.get(name)).collect(Collectors.toList()); + objects.addAll(objects2); + return objects.toArray(); + } + + @Override + public boolean moveNext() { + try { + current = queue.take(); + // If stream, simply return + if (tableCutoff <= 0L) { + return true; + } + // If table, return if below cutoff + return counter.getAndIncrement() < tableCutoff; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return false; + } + + @Override + public void reset() { + counter.set(0); + } + + @Override + public void close() { + // Unimplemented + } + }; + } + }; + } + + /** + * Waits until a first (non null) element is in the queue + */ + private static class FirstElementFetcher implements Supplier<Plc4xSchema.Record> { + + private final BlockingQueue<Plc4xSchema.Record> queue; + + private FirstElementFetcher(BlockingQueue<Plc4xSchema.Record> queue) { + this.queue = queue; + } + + @Override + public Plc4xSchema.Record get() { + Plc4xSchema.Record first; + do { + first = queue.peek(); + } while (first == null); + return first; + } + } + +} diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java index 51021d6..9e38bfc 100644 --- a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java +++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchema.java @@ -1,3 +1,21 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ package org.apache.plc4x; import org.apache.calcite.schema.Table; @@ -7,54 +25,73 @@ import org.apache.plc4x.java.scraper.Scraper; import org.apache.plc4x.java.scraper.config.JobConfiguration; import org.apache.plc4x.java.scraper.config.ScraperConfiguration; +import java.time.Instant; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.stream.Collectors; -/** - * Scraper -> Handler -> Table - */ public class Plc4xSchema extends AbstractSchema { - private final ScraperConfiguration configuration; - private final Scraper scraper; - private final QueueHandler handler; - private final Map<String, BlockingQueue<Object[]>> queues; - private final Map<String, Table> tableMap; + protected final ScraperConfiguration configuration; + protected final Scraper scraper; + protected final QueueHandler handler; + protected final Map<String, BlockingQueue<Record>> queues; + protected final Map<String, Table> tableMap; - Plc4xSchema(ScraperConfiguration configuration) { + public Plc4xSchema(ScraperConfiguration configuration, long tableCutoff) { this.configuration = configuration; this.handler = new QueueHandler(); this.scraper = new Scraper(configuration, handler); this.queues = configuration.getJobConfigurations().stream() .collect(Collectors.toMap( JobConfiguration::getName, - conf -> new ArrayBlockingQueue<Object[]>(100) + conf -> new ArrayBlockingQueue<Record>(1000) )); // Create the tables this.tableMap = configuration.getJobConfigurations().stream() .collect(Collectors.toMap( JobConfiguration::getName, - conf -> new Plc4xTable(queues.get(conf.getName()), conf) + conf -> defineTable(queues.get(conf.getName()), conf, tableCutoff) )); // Start the scraper this.scraper.start(); } + Table defineTable(BlockingQueue<Record> queue, JobConfiguration configuration, Long limit) { + if (limit <= 0) { + return new Plc4xStreamTable(queue, configuration); + } else { + return new Plc4xTable(queue, configuration, limit); + } + } + @Override protected Map<String, Table> getTableMap() { // Return a map of all jobs return this.tableMap; } + public static class Record { + + public final Instant timestamp; + public final String source; + public final Map<String, Object> values; + + public Record(Instant timestamp, String source, Map<String, Object> values) { + this.timestamp = timestamp; + this.source = source; + this.values = values; + } + } + class QueueHandler implements ResultHandler { @Override public void handle(String job, String alias, Map<String, Object> results) { - Object[] objects = results.values().toArray(); try { - queues.get(job).put(objects); + Record record = new Record(Instant.now(), alias, results); + queues.get(job).put(record); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java index 368f9fa..6b1d16f 100644 --- a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java +++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xSchemaFactory.java @@ -1,3 +1,21 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ package org.apache.plc4x; import org.apache.calcite.schema.Schema; @@ -13,8 +31,9 @@ public class Plc4xSchemaFactory implements SchemaFactory { @Override public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { + // Fetch config Object config = operand.get("config"); - Validate.notNull(config, "No configuration file given. Please specify one with 'config=...'"); + Validate.notNull(config, "No configuration file given. Please specify operand 'config'...'"); // Load configuration from file ScraperConfiguration configuration; try { @@ -22,8 +41,18 @@ public class Plc4xSchemaFactory implements SchemaFactory { } catch (IOException e) { throw new RuntimeException("Unable to load configuration file!", e); } + + // Fetch limit + Object limit = operand.get("limit"); + Validate.notNull(limit, "No limit for the number of rows for a table. Please specify operand 'config'...'"); + long parsedLimit; + try { + parsedLimit = Long.parseLong(limit.toString()); + } catch (NumberFormatException e) { + throw new RuntimeException("Given limit '" + limit + "' cannot be parsed to valid long!", e); + } // Pass the configuration to the Schema - return new Plc4xSchema(configuration); + return new Plc4xSchema(configuration, parsedLimit); } } diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xStreamTable.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xStreamTable.java new file mode 100644 index 0000000..7725a1d --- /dev/null +++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xStreamTable.java @@ -0,0 +1,45 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.plc4x; + +import org.apache.calcite.DataContext; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.StreamableTable; +import org.apache.calcite.schema.Table; +import org.apache.plc4x.java.scraper.config.JobConfiguration; + +import java.util.concurrent.BlockingQueue; + +public class Plc4xStreamTable extends Plc4xBaseTable implements ScannableTable, StreamableTable { + + public Plc4xStreamTable(BlockingQueue<Plc4xSchema.Record> queue, JobConfiguration conf) { + super(queue, conf, -1L); + } + + @Override + public Enumerable<Object[]> scan(DataContext root) { + return super.scanInternal(root); + } + + @Override + public Table stream() { + return this; + } +} diff --git a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java index ab19bd5..cd7c142 100644 --- a/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java +++ b/integrations/apache-calcite/src/main/java/org/apache/plc4x/Plc4xTable.java @@ -1,83 +1,39 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ package org.apache.plc4x; import org.apache.calcite.DataContext; -import org.apache.calcite.linq4j.AbstractEnumerable; import org.apache.calcite.linq4j.Enumerable; -import org.apache.calcite.linq4j.Enumerator; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.schema.ScannableTable; -import org.apache.calcite.schema.StreamableTable; -import org.apache.calcite.schema.Table; -import org.apache.calcite.schema.impl.AbstractTable; import org.apache.plc4x.java.scraper.config.JobConfiguration; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.concurrent.BlockingQueue; -public class Plc4xTable extends AbstractTable implements StreamableTable, ScannableTable { +public class Plc4xTable extends Plc4xBaseTable implements ScannableTable { - private final BlockingQueue<Object[]> queue; - private final JobConfiguration conf; - private Object[] current; - - public Plc4xTable(BlockingQueue<Object[]> queue, JobConfiguration conf) { - this.queue = queue; - this.conf = conf; - } - - @Override - public RelDataType getRowType(RelDataTypeFactory typeFactory) { - // Create the table spec - List<String> names = new ArrayList<>(); - List<RelDataType> types = new ArrayList<>(); - for (Map.Entry<String, String> entry : conf.getFields().entrySet()) { - names.add(entry.getKey()); - types.add(typeFactory.createJavaType(String.class)); - } - return typeFactory.createStructType(types, names); + public Plc4xTable(BlockingQueue<Plc4xSchema.Record> queue, JobConfiguration conf, long tableCutoff) { + super(queue, conf, tableCutoff); } @Override public Enumerable<Object[]> scan(DataContext root) { - return new AbstractEnumerable<Object[]>() { - @Override - public Enumerator<Object[]> enumerator() { - return new Enumerator<Object[]>() { - @Override - public Object[] current() { - return current; - } - - @Override - public boolean moveNext() { - try { - current = queue.take(); - return true; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return false; - } - - @Override - public void reset() { - // Unimplemented - } - - @Override - public void close() { - // Unimplemented - } - }; - } - }; + return super.scanInternal(root); } - @Override - public Table stream() { - return this; - } } diff --git a/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java b/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java index 9bdf9a3..5a6bc7c 100644 --- a/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java +++ b/integrations/apache-calcite/src/test/java/org/apache/plc4x/DriverManagerTest.java @@ -1,8 +1,25 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ package org.apache.plc4x; import org.apache.calcite.jdbc.CalciteConnection; import org.apache.calcite.jdbc.Driver; -import org.apache.calcite.schema.Schema; import org.apache.plc4x.java.scraper.config.ScraperConfiguration; import org.junit.jupiter.api.Test; @@ -10,7 +27,6 @@ import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.Collections; import java.util.Properties; public class DriverManagerTest { @@ -18,18 +34,57 @@ public class DriverManagerTest { @Test void instanciateJdbcConnection() throws SQLException, IOException { Driver driver = new Driver(); - Connection connection = driver.connect("jdbc:calcite://asdf;config=abc", new Properties()); + Connection connection = driver.connect("jdbc:calcite://asdf;config=abc;lex=MYSQL_ANSI", new Properties()); CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); - calciteConnection.getRootSchema().add("plc4x", new Plc4xSchema(ScraperConfiguration.fromFile("src/test/resources/example.yml"))); + calciteConnection.getRootSchema().add("plc4x", new Plc4xSchema(ScraperConfiguration.fromFile("src/test/resources/example.yml"), 10)); + + // ResultSet rs = connection.prepareStatement("SELECT STREAM \"test\", \"test\" * 2, \"test2\" FROM \"plc4x\".\"job1\"").executeQuery(); + ResultSet rs = connection.prepareStatement("SELECT STREAM * FROM \"plc4x\".\"job1\" WHERE source = 'test'").executeQuery(); + + // Print the header + int count = rs.getMetaData().getColumnCount(); + for (int i = 1; i <= count; i++) { + System.out.print(rs.getMetaData().getColumnLabel(i) + "(" + rs.getMetaData().getColumnTypeName(i) + ")" + "\t"); + } + System.out.println(""); + + while (rs.next()) { + for (int i = 1; i <= count; i++) { + System.out.print(rs.getString(i) + "\t"); + } + System.out.println(""); + } + + connection.close(); + } + + @Test + void instantiateDirect() throws IOException, SQLException { + Driver driver = new Driver(); + Connection connection = driver.connect("jdbc:calcite:model=src/test/resources/model.yml;lex=MYSQL_ANSI", new Properties()); - ResultSet rs = connection.prepareStatement("SELECT STREAM * FROM \"plc4x\".\"job1\"").executeQuery(); + // ResultSet rs = connection.prepareStatement("SELECT STREAM \"test\", \"test\" * 2, \"test2\" FROM \"plc4x\".\"job1\"").executeQuery(); + ResultSet rs = connection.prepareStatement("SELECT * FROM \"plc4x-tables\".\"job1\"").executeQuery(); + + // Print the header + int count = rs.getMetaData().getColumnCount(); + for (int i = 1; i <= count; i++) { + System.out.print(rs.getMetaData().getColumnLabel(i) + "(" + rs.getMetaData().getColumnTypeName(i) + ")" + "\t"); + } + System.out.println(""); + + int row = 1; while (rs.next()) { - System.out.print("Spalte 1: " + rs.getString(1) + "\t"); - System.out.println("Spalte 2: " + rs.getString(2)); + System.out.print(row++ + "\t"); + for (int i = 1; i <= count; i++) { + System.out.print(rs.getString(i) + "\t"); + } + System.out.println(""); } connection.close(); } + } diff --git a/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java b/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java new file mode 100644 index 0000000..c89d799 --- /dev/null +++ b/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4XBaseTableTest.java @@ -0,0 +1,48 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ +package org.apache.plc4x; + +import org.apache.calcite.linq4j.Enumerator; +import org.apache.plc4x.java.scraper.config.JobConfiguration; +import org.assertj.core.api.WithAssertions; +import org.junit.jupiter.api.Test; + +import java.time.Instant; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; + +class Plc4XBaseTableTest implements WithAssertions { + + @Test + void testOnBlockingQueue() { + ArrayBlockingQueue<Plc4xSchema.Record> queue = new ArrayBlockingQueue<>(100); + Plc4xStreamTable table = new Plc4xStreamTable(queue, new JobConfiguration("job1", 100, + Collections.emptyList(), + Collections.singletonMap("key", "address"))); + + Map<String, Object> objects = Collections.singletonMap("key", "value"); + queue.add(new Plc4xSchema.Record(Instant.now(), "", objects)); + + Enumerator<Object[]> enumerator = table.scan(null).enumerator(); + + assertThat(enumerator.moveNext()).isTrue(); + assertThat(enumerator.current()).containsExactly("value"); + } +} \ No newline at end of file diff --git a/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4xTableTest.java b/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4xTableTest.java deleted file mode 100644 index 6d4fb17..0000000 --- a/integrations/apache-calcite/src/test/java/org/apache/plc4x/Plc4xTableTest.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.apache.plc4x; - -import org.apache.calcite.linq4j.Enumerator; -import org.assertj.core.api.WithAssertions; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.ArrayBlockingQueue; - -class Plc4xTableTest implements WithAssertions { - - @Test - void testOnBlockingQueue() { - ArrayBlockingQueue<Object[]> queue = new ArrayBlockingQueue<Object[]>(100); - Plc4xTable table = new Plc4xTable(queue, null); - - Object[] objects = new Object[0]; - queue.add(objects); - - Enumerator<Object[]> enumerator = table.scan(null).enumerator(); - - assertThat(enumerator.moveNext()).isTrue(); - assertThat(enumerator.current()).isEqualTo(objects); - } -} \ No newline at end of file diff --git a/integrations/apache-calcite/src/test/resources/example.yml b/integrations/apache-calcite/src/test/resources/example.yml index 93b58e2..7294ab0 100644 --- a/integrations/apache-calcite/src/test/resources/example.yml +++ b/integrations/apache-calcite/src/test/resources/example.yml @@ -19,12 +19,14 @@ --- sources: test: test:test + test2: test:test2 jobs: - name: job1 - scrapeRate: 1000 + scrapeRate: 10 sources: - test + - test2 fields: test: 'RANDOM/test:Integer' test2: 'RANDOM/test:String' \ No newline at end of file diff --git a/integrations/apache-calcite/src/test/resources/logback.xml b/integrations/apache-calcite/src/test/resources/logback.xml new file mode 100644 index 0000000..dd243bd --- /dev/null +++ b/integrations/apache-calcite/src/test/resources/logback.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<configuration xmlns="http://ch.qos.logback/xml/ns/logback" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://ch.qos.logback/xml/ns/logback + https://raw.githubusercontent.com/enricopulatzo/logback-XSD/master/src/main/xsd/logback.xsd"> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <!-- encoders are assigned the type + ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern> + </encoder> + </appender> + + <root level="info"> + <appender-ref ref="STDOUT" /> + </root> + +</configuration> \ No newline at end of file diff --git a/integrations/apache-calcite/src/test/resources/model.yml b/integrations/apache-calcite/src/test/resources/model.yml new file mode 100644 index 0000000..07b7479 --- /dev/null +++ b/integrations/apache-calcite/src/test/resources/model.yml @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to you under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# A JSON model of a simple Calcite schema. +# +version: 1.0 +defaultSchema: PLC4X +schemas: +- name: PLC4X + type: custom + factory: org.apache.plc4x.Plc4xSchemaFactory + operand: + config: /Users/julian/Develop/incubator-plc4x/integrations/apache-calcite/src/test/resources/example.yml + limit: -1 +- name: PLC4X-TABLES + type: custom + factory: org.apache.plc4x.Plc4xSchemaFactory + operand: + config: /Users/julian/Develop/incubator-plc4x/integrations/apache-calcite/src/test/resources/example.yml + limit: 100 \ No newline at end of file diff --git a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java index 2fd50f7..5549432 100644 --- a/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java +++ b/plc4j/utils/scraper/src/main/java/org/apache/plc4x/java/scraper/config/JobConfiguration.java @@ -43,7 +43,7 @@ public class JobConfiguration { * @param fields Map from field alias (how it is named in the result map) to plc4x field query */ @JsonCreator - JobConfiguration(@JsonProperty(value = "name", required = true) String name, + public JobConfiguration(@JsonProperty(value = "name", required = true) String name, @JsonProperty(value = "scrapeRate", required = true) int scrapeRate, @JsonProperty(value = "sources", required = true) List<String> sources, @JsonProperty(value = "fields", required = true) Map<String, String> fields) {