This is an automated email from the ASF dual-hosted git repository.
toulmean pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push:
new 7f23059 Make synchronizers configurable
new fa88db3 Merge pull request #369 from atoulme/synchronizer_config
7f23059 is described below
commit 7f23059aa20df0edc4f4a1ea811198fe8296ebab
Author: Antoine Toulme <[email protected]>
AuthorDate: Mon Jan 24 22:19:53 2022 -0800
Make synchronizers configurable
---
.../apache/tuweni/config/PropertyValidator.java | 15 ++++
.../tuweni/config/PropertyValidatorTest.java | 8 ++
.../org/apache/tuweni/ethclient/EthereumClient.kt | 75 +++++++++++--------
.../tuweni/ethclient/EthereumClientConfig.kt | 85 ++++++++++++++++++++--
...nizer.kt => FromBestBlockHeaderSynchronizer.kt} | 10 ++-
.../ethclient/FromUnknownParentSynchronizer.kt | 10 ++-
.../tuweni/ethclient/PeerStatusEthSynchronizer.kt | 7 +-
.../org/apache/tuweni/ethclient/Synchronizer.kt | 5 +-
8 files changed, 172 insertions(+), 43 deletions(-)
diff --git
a/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java
b/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java
index c59c01d..7cbc3d3 100644
--- a/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java
+++ b/config/src/main/java/org/apache/tuweni/config/PropertyValidator.java
@@ -103,6 +103,21 @@ public interface PropertyValidator<T> {
}
/**
+ * A validator that ensures a property, if present, is equal or greater than
the value.
+ *
+ * @param value The lower bound (inclusive).
+ * @return A validator that ensures a property, if present, is equal or
greater than the value.
+ */
+ static PropertyValidator<Number> isGreaterOrEqual(long value) {
+ return (key, position, currentValue) -> {
+ if (currentValue != null && (currentValue.longValue() < value)) {
+ return singleError(position, "Value of property '" + key + "' is less
than '" + value + "'");
+ }
+ return noErrors();
+ };
+ }
+
+ /**
* A validator that ensures a property, if present, has a value within a
given set.
*
* @param values The acceptable values.
diff --git
a/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java
b/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java
index 17e44cf..01a63e0 100644
--- a/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java
+++ b/config/src/test/java/org/apache/tuweni/config/PropertyValidatorTest.java
@@ -81,4 +81,12 @@ class PropertyValidatorTest {
assertEquals(1, errors.size());
assertEquals("Value of property 'foo' should be \"one\", \"two\", or
\"three \"", errors.get(0).getMessage());
}
+
+ @Test
+ void validatesEqualOrGreater() {
+ PropertyValidator<Number> longPropertyValidator =
PropertyValidator.isGreaterOrEqual(32L);
+ assertTrue(longPropertyValidator.validate("foo", null, 33L).isEmpty());
+ assertTrue(longPropertyValidator.validate("foo", null, 32L).isEmpty());
+ assertEquals(1, longPropertyValidator.validate("foo", null, 31L).size());
+ }
}
diff --git
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
index 9f1a958..6bd6b93 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClient.kt
@@ -74,13 +74,13 @@ class EthereumClient(
}
private var metricsService: MetricsService? = null
- private val genesisFiles = HashMap<String, GenesisFile>()
- private val services = HashMap<String, RLPxService>()
- private val storageRepositories = HashMap<String, BlockchainRepository>()
- val peerRepositories = HashMap<String, EthereumPeerRepository>()
- private val dnsClients = HashMap<String, DNSClient>()
- private val discoveryServices = HashMap<String, DiscoveryService>()
- private val synchronizers = HashMap<String, Synchronizer>()
+ private val genesisFiles = mutableMapOf<String, GenesisFile>()
+ private val services = mutableMapOf<String, RLPxService>()
+ private val storageRepositories = mutableMapOf<String,
BlockchainRepository>()
+ val peerRepositories = mutableMapOf<String, EthereumPeerRepository>()
+ private val dnsClients = mutableMapOf<String, DNSClient>()
+ private val discoveryServices = mutableMapOf<String, DiscoveryService>()
+ private val synchronizers = mutableMapOf<String, Synchronizer>()
private val managerHandler = mutableListOf<DefaultCacheManager>()
@@ -272,28 +272,45 @@ class EthereumClient(
}
}
- val synchronizer = PeerStatusEthSynchronizer(
- repository = repository,
- client = service.getClient(ETH66) as EthRequestsManager,
- peerRepository = peerRepository,
- adapter = adapter
- )
- synchronizers[rlpxConfig.getName() + "status"] = synchronizer
- synchronizer.start()
- val parentSynchronizer = FromUnknownParentSynchronizer(
- repository = repository,
- client = service.getClient(ETH66) as EthRequestsManager,
- peerRepository = peerRepository
- )
- synchronizers[rlpxConfig.getName() + "parent"] = parentSynchronizer
- parentSynchronizer.start()
- val bestSynchronizer = FromBestBlockSynchronizer(
- repository = repository,
- client = service.getClient(ETH66) as EthRequestsManager,
- peerRepository = peerRepository
- )
- synchronizers[rlpxConfig.getName() + "best"] = bestSynchronizer
- bestSynchronizer.start()
+ for (sync in config.synchronizers()) {
+ when (sync.getType()) {
+ SynchronizerType.best -> {
+ val bestSynchronizer = FromBestBlockHeaderSynchronizer(
+ repository = repository,
+ client = service.getClient(ETH66) as EthRequestsManager,
+ peerRepository = peerRepository,
+ from = sync.getFrom(),
+ to = sync.getTo(),
+ )
+ bestSynchronizer.start()
+ synchronizers[sync.getName()] = bestSynchronizer
+ }
+ SynchronizerType.status -> {
+ val synchronizer = PeerStatusEthSynchronizer(
+ repository = repository,
+ client = service.getClient(ETH66) as EthRequestsManager,
+ peerRepository = peerRepository,
+ adapter = adapter,
+ from = sync.getFrom(),
+ to = sync.getTo(),
+ )
+ synchronizer.start()
+ synchronizers[sync.getName()] = synchronizer
+ }
+ SynchronizerType.parent -> {
+ val parentSynchronizer = FromUnknownParentSynchronizer(
+ repository = repository,
+ client = service.getClient(ETH66) as EthRequestsManager,
+ peerRepository = peerRepository,
+ from = sync.getFrom(),
+ to = sync.getTo(),
+ )
+ parentSynchronizer.start()
+ synchronizers[sync.getName()] = parentSynchronizer
+ }
+ }
+ }
+
logger.info("Finished configuring Ethereum client
${rlpxConfig.getName()}")
}
}
diff --git
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
index 64c2d76..83833ea 100644
---
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
+++
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/EthereumClientConfig.kt
@@ -26,6 +26,7 @@ import org.apache.tuweni.config.SchemaBuilder
import org.apache.tuweni.crypto.SECP256K1
import org.apache.tuweni.devp2p.parseEnodeUri
import org.apache.tuweni.eth.genesis.GenesisFile
+import org.apache.tuweni.units.bigints.UInt256
import org.slf4j.LoggerFactory
import java.io.FileNotFoundException
import java.net.URI
@@ -186,6 +187,21 @@ class EthereumClientConfig(private var config:
Configuration = Configuration.emp
}
}
+ fun synchronizers(): List<SynchronizerConfiguration> {
+ val synchronizers = config.sections("synchronizer")
+ if (synchronizers == null || synchronizers.isEmpty()) {
+ return emptyList()
+ }
+ return synchronizers.map { section ->
+ val sectionConfig = config.getConfigurationSection("proxy.$section")
+ SynchronizerConfigurationImpl(
+ section, SynchronizerType.valueOf(sectionConfig.getString("type")),
+ UInt256.valueOf(sectionConfig.getLong("from")),
+ UInt256.valueOf(sectionConfig.getLong("to"))
+ )
+ }
+ }
+
fun validate(): Stream<ConfigurationError> {
val schema = createSchema()
var errors = schema.validate(this.config)
@@ -205,7 +221,10 @@ class EthereumClientConfig(private var config:
Configuration = Configuration.emp
private fun validateSubsection(name: String, schema: Schema):
Stream<ConfigurationError> {
var errors = listOf<ConfigurationError>().stream()
for (subSection in this.config.sections(name)) {
- errors = Stream.concat(errors,
schema.getSubSection(name).validate(this.config.getConfigurationSection("$name.$subSection")))
+ errors = Stream.concat(
+ errors,
+
schema.getSubSection(name).validate(this.config.getConfigurationSection("$name.$subSection"))
+ )
}
return errors
}
@@ -250,7 +269,12 @@ class EthereumClientConfig(private var config:
Configuration = Configuration.emp
val discoverySection = SchemaBuilder.create()
discoverySection.addString("identity", "", "Node identity", null)
discoverySection.addString("networkInterface", "127.0.0.1", "Network
interface to bind", null)
- discoverySection.addInteger("port", 0, "Port to expose the discovery
service on", PropertyValidator.isValidPortOrZero())
+ discoverySection.addInteger(
+ "port",
+ 0,
+ "Port to expose the discovery service on",
+ PropertyValidator.isValidPortOrZero()
+ )
discoverySection.addString("peerRepository", "default", "Peer repository
to which records should go", null)
val genesis = SchemaBuilder.create()
@@ -276,7 +300,12 @@ class EthereumClientConfig(private var config:
Configuration = Configuration.emp
}
)
rlpx.addInteger("port", 0, "Port to expose the RLPx service on",
PropertyValidator.isValidPortOrZero())
- rlpx.addInteger("advertisedPort", 30303, "Port to advertise in
communications as the RLPx service port", PropertyValidator.isValidPort())
+ rlpx.addInteger(
+ "advertisedPort",
+ 30303,
+ "Port to advertise in communications as the RLPx service port",
+ PropertyValidator.isValidPort()
+ )
rlpx.addString("clientName", "Apache Tuweni", "Name of the Ethereum
client", null)
rlpx.addString("repository", "default", "Name of the blockchain
repository", null)
rlpx.addString("peerRepository", "default", "Peer repository to which
records should go", null)
@@ -287,6 +316,17 @@ class EthereumClientConfig(private var config:
Configuration = Configuration.emp
val peerRepositoriesSection = SchemaBuilder.create()
peerRepositoriesSection.addString("type", "memory", "Peer repository
type", PropertyValidator.anyOf("memory"))
+
+ val synchronizersSection = SchemaBuilder.create()
+ synchronizersSection.addString(
+ "type",
+ "status",
+ "Synchronizer type",
+ PropertyValidator.anyOf("status", "parent", "best")
+ )
+ synchronizersSection.addLong("from", 0L, "Start block to sync from",
PropertyValidator.isGreaterOrEqual(0L))
+ synchronizersSection.addLong("to", 0L, "End block to sync to",
PropertyValidator.isGreaterOrEqual(0L))
+
val builder = SchemaBuilder.create()
builder.addSection("metrics", metricsSection.toSchema())
builder.addSection("storage", storageSection.toSchema())
@@ -297,6 +337,7 @@ class EthereumClientConfig(private var config:
Configuration = Configuration.emp
builder.addSection("genesis", genesis.toSchema())
builder.addSection("proxy", proxiesSection.toSchema())
builder.addSection("peerRepository", peerRepositoriesSection.toSchema())
+ builder.addSection("synchronizer", synchronizersSection.toSchema())
return builder.toSchema()
}
@@ -379,7 +420,19 @@ interface PeerRepositoryConfiguration {
fun getType(): String
}
-internal class PeerRepositoryConfigurationImpl(private val repoName: String,
private val type: String) : PeerRepositoryConfiguration {
+enum class SynchronizerType {
+ best, status, parent
+}
+
+interface SynchronizerConfiguration {
+ fun getName(): String
+ fun getType(): SynchronizerType
+ fun getFrom(): UInt256?
+ fun getTo(): UInt256?
+}
+
+internal class PeerRepositoryConfigurationImpl(private val repoName: String,
private val type: String) :
+ PeerRepositoryConfiguration {
override fun getName(): String = repoName
override fun getType(): String = type
}
@@ -414,7 +467,8 @@ internal data class RLPxServiceConfigurationImpl(
override fun peerRepository(): String = peerRepository
}
-internal data class GenesisFileConfigurationImpl(private val name: String,
private val genesisFilePath: URI) : GenesisFileConfiguration {
+internal data class GenesisFileConfigurationImpl(private val name: String,
private val genesisFilePath: URI) :
+ GenesisFileConfiguration {
override fun getName(): String = name
override fun genesisFile(): GenesisFile =
@@ -470,16 +524,33 @@ data class DiscoveryConfigurationImpl(
override fun getPort() = port
}
-data class StaticPeersConfigurationImpl(private val enodes: List<String>,
private val peerRepository: String) : StaticPeersConfiguration {
+data class StaticPeersConfigurationImpl(private val enodes: List<String>,
private val peerRepository: String) :
+ StaticPeersConfiguration {
override fun enodes(): List<String> = enodes
override fun peerRepository() = peerRepository
}
-data class ProxyConfigurationImpl(private val name: String, private val
upstream: String, private val downstream: String) : ProxyConfiguration {
+data class ProxyConfigurationImpl(
+ private val name: String,
+ private val upstream: String,
+ private val downstream: String,
+) : ProxyConfiguration {
override fun name() = name
override fun upstream() = upstream
override fun downstream() = downstream
}
+
+data class SynchronizerConfigurationImpl(
+ private val name: String,
+ private val type: SynchronizerType,
+ private val from: UInt256?,
+ private val to: UInt256?,
+) : SynchronizerConfiguration {
+ override fun getName() = name
+ override fun getType() = type
+ override fun getFrom(): UInt256? = from
+ override fun getTo(): UInt256? = to
+}
diff --git
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockHeaderSynchronizer.kt
similarity index 91%
rename from
eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
rename to
eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockHeaderSynchronizer.kt
index 4edb80a..d70a3bc 100644
---
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockSynchronizer.kt
+++
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromBestBlockHeaderSynchronizer.kt
@@ -22,6 +22,7 @@ import kotlinx.coroutines.launch
import org.apache.tuweni.devp2p.eth.EthRequestsManager
import org.apache.tuweni.eth.BlockHeader
import org.apache.tuweni.eth.repository.BlockchainRepository
+import org.apache.tuweni.units.bigints.UInt256
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
@@ -29,13 +30,15 @@ import kotlin.coroutines.CoroutineContext
const val BEST_PEER_DELAY: Long = 5000
const val HEADERS_RESPONSE_TIMEOUT: Long = 10000
-class FromBestBlockSynchronizer(
+class FromBestBlockHeaderSynchronizer(
executor: ExecutorService = Executors.newSingleThreadExecutor(),
coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(),
repository: BlockchainRepository,
client: EthRequestsManager,
peerRepository: EthereumPeerRepository,
-) : Synchronizer(executor, coroutineContext, repository, client,
peerRepository) {
+ from: UInt256?,
+ to: UInt256?,
+) : Synchronizer(executor, coroutineContext, repository, client,
peerRepository, from, to) {
override fun start() {
launch {
@@ -51,6 +54,9 @@ class FromBestBlockSynchronizer(
}
private fun askNextBestHeaders(header: BlockHeader) {
+ if ((null != from && header.number < from) || (null != to && header.number
> to)) {
+ return
+ }
launch {
if (peerRepository.activeConnections().count() == 0L) {
askNextBestHeaders(header)
diff --git
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
index 2e9b9c4..6222608 100644
---
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
+++
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/FromUnknownParentSynchronizer.kt
@@ -22,6 +22,7 @@ import kotlinx.coroutines.launch
import org.apache.tuweni.devp2p.eth.EthRequestsManager
import org.apache.tuweni.eth.BlockHeader
import org.apache.tuweni.eth.repository.BlockchainRepository
+import org.apache.tuweni.units.bigints.UInt256
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
@@ -34,8 +35,10 @@ class FromUnknownParentSynchronizer(
coroutineContext: CoroutineContext = executor.asCoroutineDispatcher(),
repository: BlockchainRepository,
client: EthRequestsManager,
- peerRepository: EthereumPeerRepository
-) : Synchronizer(executor, coroutineContext, repository, client,
peerRepository) {
+ peerRepository: EthereumPeerRepository,
+ from: UInt256?,
+ to: UInt256?
+) : Synchronizer(executor, coroutineContext, repository, client,
peerRepository, from, to) {
var listenerId: String? = null
@@ -54,6 +57,9 @@ class FromUnknownParentSynchronizer(
if (header.number.isZero) {
return
}
+ if ((null != from && header.number < from) || (null != to && header.number
> to)) {
+ return
+ }
val parentHash = header.parentHash ?: return
launch {
delay(DELAY)
diff --git
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
index 1df2b15..0752b13 100644
---
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
+++
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/PeerStatusEthSynchronizer.kt
@@ -21,6 +21,7 @@ import kotlinx.coroutines.launch
import org.apache.tuweni.devp2p.eth.EthRequestsManager
import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.repository.BlockchainRepository
+import org.apache.tuweni.units.bigints.UInt256
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
@@ -39,8 +40,10 @@ class PeerStatusEthSynchronizer(
repository: BlockchainRepository,
client: EthRequestsManager,
peerRepository: EthereumPeerRepository,
- private val adapter: WireConnectionPeerRepositoryAdapter
-) : Synchronizer(executor, coroutineContext, repository, client,
peerRepository) {
+ private val adapter: WireConnectionPeerRepositoryAdapter,
+ from: UInt256?,
+ to: UInt256?
+) : Synchronizer(executor, coroutineContext, repository, client,
peerRepository, from, to) {
var listenerId: String? = null
diff --git
a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
index e0db77d..2cac6a3 100644
--- a/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
+++ b/eth-client/src/main/kotlin/org/apache/tuweni/ethclient/Synchronizer.kt
@@ -25,6 +25,7 @@ import org.apache.tuweni.devp2p.eth.EthRequestsManager
import org.apache.tuweni.eth.BlockHeader
import org.apache.tuweni.eth.Hash
import org.apache.tuweni.eth.repository.BlockchainRepository
+import org.apache.tuweni.units.bigints.UInt256
import org.slf4j.LoggerFactory
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
@@ -37,7 +38,9 @@ abstract class Synchronizer(
override val coroutineContext: CoroutineContext =
executor.asCoroutineDispatcher(),
val repository: BlockchainRepository,
val client: EthRequestsManager,
- val peerRepository: EthereumPeerRepository
+ val peerRepository: EthereumPeerRepository,
+ val from: UInt256?,
+ val to: UInt256?
) : CoroutineScope {
abstract fun start()
abstract fun stop()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]