This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch add_jsonrpc_downloader in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
commit 6210f78df5614c7957136606d6a244162ddfcc26 Author: Antoine Toulme <anto...@lunar-ocean.com> AuthorDate: Fri May 26 19:22:18 2023 -0700 Add new module, JSON-RPC downloader --- jsonrpc-downloader/README.md | 20 +++ jsonrpc-downloader/build.gradle | 52 ++++++ .../tuweni/jsonrpc/downloader/DownloadState.kt | 3 + .../apache/tuweni/jsonrpc/downloader/Downloader.kt | 194 +++++++++++++++++++++ .../tuweni/jsonrpc/downloader/DownloaderConfig.kt | 50 ++++++ .../tuweni/jsonrpc/downloader/DownloaderTest.kt | 41 +++++ settings.gradle | 1 + 7 files changed, 361 insertions(+) diff --git a/jsonrpc-downloader/README.md b/jsonrpc-downloader/README.md new file mode 100644 index 000000000..a8753b874 --- /dev/null +++ b/jsonrpc-downloader/README.md @@ -0,0 +1,20 @@ +<!--- +Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE +file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file +to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the +License. You may obtain a copy of the License at + * +http://www.apache.org/licenses/LICENSE-2.0 + * +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. + ---> +# JSONRPC downloader + +| Status | | +|----------------|---------------| +| Stability | [development] | +| Component Type | [application] | + +The `jsonrpc downloader` application downloads blocks and transactions from a JSON-RPC endpoint and stores them in files. diff --git a/jsonrpc-downloader/build.gradle b/jsonrpc-downloader/build.gradle new file mode 100644 index 000000000..e05625284 --- /dev/null +++ b/jsonrpc-downloader/build.gradle @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +plugins { id 'application' } + +description = 'Ethereum JSON-RPC downloader' + +dependencies { + implementation project(':app-commons') + implementation project(':bytes') + implementation project(':config') + implementation project(':crypto') + implementation project(':concurrent') + implementation project(':eth') + implementation project(':jsonrpc') + implementation project(':kv') + implementation project(':metrics') + implementation project(':net') + implementation project(':units') + + implementation 'com.fasterxml.jackson.core:jackson-databind' + implementation 'org.bouncycastle:bcprov-jdk15on' + implementation 'io.vertx:vertx-core' + implementation 'io.vertx:vertx-lang-kotlin-coroutines' + implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core' + implementation 'org.slf4j:slf4j-api' + implementation 'org.jetbrains.kotlin:kotlin-stdlib' + implementation 'org.postgresql:postgresql' + implementation 'javax.xml.bind:jaxb-api' + + testImplementation project(':junit') + testImplementation 'org.junit.jupiter:junit-jupiter-api' + testImplementation 'org.junit.jupiter:junit-jupiter-params' + + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine' + + runtimeOnly 'ch.qos.logback:logback-classic' +} + +application { + mainClassName = 'org.apache.tuweni.jsonrpc.downloader.DownloaderApp' + applicationName = 'jsonrpc-downloader' +} diff --git a/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloadState.kt b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloadState.kt new file mode 100644 index 000000000..a42285449 --- /dev/null +++ b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloadState.kt @@ -0,0 +1,3 @@ +package org.apache.tuweni.jsonrpc.downloader + +data class DownloadState(val start: Int, val end: Int) diff --git a/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/Downloader.kt b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/Downloader.kt new file mode 100644 index 000000000..08d8e9828 --- /dev/null +++ b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/Downloader.kt @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tuweni.jsonrpc.downloader + +import com.fasterxml.jackson.core.type.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper +import io.vertx.core.Vertx +import io.vertx.core.VertxOptions +import io.vertx.core.buffer.Buffer +import io.vertx.kotlin.coroutines.await +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.asCoroutineDispatcher +import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import org.apache.tuweni.app.commons.ApplicationUtils +import org.apache.tuweni.jsonrpc.JSONRPCClient +import org.bouncycastle.jce.provider.BouncyCastleProvider +import org.slf4j.LoggerFactory +import java.io.IOException +import java.lang.Exception +import java.lang.Integer.max +import java.lang.Integer.min +import java.nio.file.Files +import java.nio.file.Path +import java.nio.file.Paths +import java.security.Security +import java.util.concurrent.Executors +import java.util.concurrent.atomic.AtomicInteger +import kotlin.coroutines.CoroutineContext +import kotlin.math.round +import kotlin.system.exitProcess + + +val logger = LoggerFactory.getLogger(Downloader::class.java) + +/** + * Application downloading chain data from a JSON-RPC endpoint. + */ +object DownloaderApp { + + @JvmStatic + fun main(args: Array<String>) { + runBlocking { + if (args.contains("--version")) { + println("Apache Tuweni JSON-RPC downloader ${ApplicationUtils.version}") + exitProcess(0) + } + if (args.contains("--help") || args.contains("-h")) { + println("USAGE: jsonrpc-downloader <config file>") + exitProcess(0) + } + ApplicationUtils.renderBanner("Loading JSON-RPC downloader") + Security.addProvider(BouncyCastleProvider()) + val configFile = Paths.get(if (args.isNotEmpty()) args[0] else "config.toml") + Security.addProvider(BouncyCastleProvider()) + + val config = DownloaderConfig(configFile) + if (config.config.hasErrors()) { + for (error in config.config.errors()) { + println(error.message) + } + System.exit(1) + } + val vertx = Vertx.vertx(VertxOptions().setWorkerPoolSize(config.numberOfThreads())) + val pool = Executors.newFixedThreadPool( + config.numberOfThreads() + ) { + val thread = Thread("downloader") + thread.isDaemon = true + thread + } + val downloader = Downloader(vertx, config, pool.asCoroutineDispatcher()) + logger.info("Starting download") + try { + downloader.loopDownload() + } catch (e: Exception) { + logger.error("Fatal error downloading blocks", e) + exitProcess(1) + } + logger.info("Completed download") + + vertx.close() + pool.shutdown() + } + } +} + +class Downloader(val vertx: Vertx, val config: DownloaderConfig, override val coroutineContext: CoroutineContext) : + CoroutineScope { + + val jsonRpcClient: JSONRPCClient + val objectMapper = ObjectMapper() + + init { + jsonRpcClient = JSONRPCClient(vertx, config.url(), coroutineContext = this.coroutineContext) + } + + suspend fun loopDownload() = coroutineScope { + val state = readInitialState() + val intervals = createMissingIntervals(state) + logger.info("Working with intervals ${intervals}") + val jobs = mutableListOf<Job>() + var length = 0 + var completed = 0 + for (interval in intervals) { + length += interval.last - interval.first + for (i in interval) { + val job = launch { + try { + val block = downloadBlock(i) + writeBlock(i, block) + } catch (e: Exception) { + logger.error("Error downloading block ${i}, aborting", e) + } + completed++ + } + jobs.add(job) + } + } + launch { + while (completed < length) { + delay(5000) + logger.info("Progress ${round(completed*100.0*100/length)/100}") + } + + } + jobs.joinAll() + writeFinalState() + } + + private suspend fun downloadBlock(blockNumber: Int): String { + val blockJson = jsonRpcClient.getBlockByBlockNumber(blockNumber, true) + return objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(blockJson) + } + + private suspend fun writeBlock(blockNumber: Int, block: String) { + val filePath = Paths.get(config.outputPath(), "block-${blockNumber.toString().padStart(16, '0')}.json") + coroutineScope { + vertx.fileSystem().writeFile(filePath.toString(), Buffer.buffer(block)).await() + } + } + + fun createMissingIntervals(state: DownloadState): List<IntRange> { + val intervals = mutableListOf<IntRange>() + if (config.start() < state.start) { + intervals.add(config.start()..min(state.start, config.end())) + } + if (state.end < config.end()) { + intervals.add(max(config.start(), state.end)..config.end()) + } + + return intervals + } + + private fun readInitialState(): DownloadState { + //read the initial state + var initialState = DownloadState(0, 0) + try { + val str = Files.readString(Path.of(config.outputPath(), ".offset")) + initialState = objectMapper.readValue(str, object : TypeReference<DownloadState>() {}) + } catch (e: IOException) { + // ignored + } + return initialState + } + + private fun writeFinalState() { + val state = DownloadState(config.start(), config.end()) + val json = objectMapper.writeValueAsString(state) + Files.writeString(Path.of(config.outputPath(), ".offset"), json) + } +} diff --git a/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderConfig.kt b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderConfig.kt new file mode 100644 index 000000000..dbe7c63ca --- /dev/null +++ b/jsonrpc-downloader/src/main/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderConfig.kt @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tuweni.jsonrpc.downloader + +import org.apache.tuweni.config.Configuration +import org.apache.tuweni.config.SchemaBuilder +import java.nio.file.Path + +class DownloaderConfig(filePath: Path? = null, configContents: String? = null) { + + companion object { + + fun schema() = SchemaBuilder.create() + .addInteger("numberOfThreads", 10, "Number of threads for each thread pool", null) + .addString("outputPath", "", "Path to output block data", null) + .addInteger("start", 0, "First block to scrape", null) + .addInteger("end", null, "Last block to scrape. If unset, the scrape will continue to ask for new blocks", null) + .addString("url", null, "URL of the JSON-RPC service to query for information", null) + .toSchema() + } + + val config = if (filePath != null) { + Configuration.fromToml(filePath, schema()) + } else if (configContents != null) { + Configuration.fromToml(configContents) + } else { + Configuration.empty(schema()) + } + + fun numberOfThreads() = config.getInteger("numberOfThreads") + fun outputPath() = config.getString("outputPath") + fun start() = config.getInteger("start") + fun end() = config.getInteger("end") + fun url() = config.getString("url") + +} diff --git a/jsonrpc-downloader/src/test/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderTest.kt b/jsonrpc-downloader/src/test/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderTest.kt new file mode 100644 index 000000000..f6964ca71 --- /dev/null +++ b/jsonrpc-downloader/src/test/kotlin/org/apache/tuweni/jsonrpc/downloader/DownloaderTest.kt @@ -0,0 +1,41 @@ +package org.apache.tuweni.jsonrpc.downloader + +import io.vertx.core.Vertx +import kotlinx.coroutines.Dispatchers +import org.apache.tuweni.config.Configuration +import org.apache.tuweni.junit.VertxExtension +import org.apache.tuweni.junit.VertxInstance +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith + +@ExtendWith(VertxExtension::class) +class DownloaderTest { + @Test + fun testIntervals(@VertxInstance vertx : Vertx) { + val config = DownloaderConfig(configContents = """ + start=10 + end=20 + url="example.com" + """.trimIndent()) + val downloader = Downloader(vertx, config, Dispatchers.Default) + var intervals = downloader.createMissingIntervals(DownloadState(0,0)) + assertEquals(1, intervals.size) + assertEquals(10..20, intervals[0]) + intervals = downloader.createMissingIntervals(DownloadState(10,20)) + assertEquals(0, intervals.size) + intervals = downloader.createMissingIntervals(DownloadState(25,40)) + assertEquals(1, intervals.size) + assertEquals(10..20, intervals[0]) + intervals = downloader.createMissingIntervals(DownloadState(5,15)) + assertEquals(1, intervals.size) + assertEquals(15..20, intervals[0]) + intervals = downloader.createMissingIntervals(DownloadState(15, 25)) + assertEquals(1, intervals.size) + assertEquals(10..15, intervals[0]) + intervals = downloader.createMissingIntervals(DownloadState(12, 18)) + assertEquals(2, intervals.size) + assertEquals(10..12, intervals[0]) + assertEquals(18..20, intervals[1]) + } +} diff --git a/settings.gradle b/settings.gradle index fd46989ec..648e74d5f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -43,6 +43,7 @@ include 'hobbits-relayer' include 'io' include 'jsonrpc' include 'jsonrpc-app' +include 'jsonrpc-downloader' include 'junit' include 'kademlia' include 'kv' --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org